<img src="./images/Logo.png">

# Oil Price Analyzer
## A daily oil price analyzer and predictor

<br> <br>

It consist in a software pipeline with the aim of visualize and predict the diesel and oil prices of roughtly 600 oil station situated in Catania, on a daily basis.

# The structure

The pipeline is based on different software, all dockerized, that collaborate passing data between each other and enriching it.

<div align="center">
<img src="./images/Pipeline.jpg" width="70%"/>
</div>

# Data Format

Every morning the Italian Minister "Ministero delle Imprese e del Made in Italy" publishes the prices of oil and other fuels at 8 a.m. in a **csv** format. Furthermore, in the following <a href="https://www.mimit.gov.it/images/exportCSV/anagrafica_impianti_attivi.csv">link</a>, a registry of the stations can be retrieved.

<br> <br>

### Let's pay attention at the phrase above: "[...] the prices of oil and other fuels at 8 a.m.", **more on that later**

## Prezzi alle 8.csv

Estrazione del 2023-07-24 <br>
idImpianto;descCarburante;prezzo;isSelf;dtComu<br>
3464;Benzina;2.255;0;21/07/2023 20:30:06<br>
3464;Benzina;1.945;1;21/07/2023 20:30:06<br>
3464;Gasolio;2.109;0;21/07/2023 20:30:06<br>
...<br>
...<br>
...<br>
56860;Benzina;1.859;1;22/07/2023 10:14:47<br>
56860;Gasolio;1.689;0;22/07/2023 10:14:47<br>
56860;Gasolio;1.689;1;22/07/2023 10:14:47<br>

# Logstash

Logstash is the software/technology chosen for the **Data Ingestion** section of the project. The choice was mainly made because of the easy of use and integration of its plugins, two in particular: **Http_poller input plugin**, and the other one we'll save it for later (again...)

## Http_poller input plugin

This plugin makes everything easier when it comes to HTTP GET/POST request with a specific schedule, described for example by using a crontab format. 

Speaking of schedule, we now come to the main problem, earlier we said "[...] the prices of oil and other fuels at 8 a.m." which (sadly) doesn't mean that the file gets uploaded at 8 a.m., it simply means that the price indicated in the csv file is the price of the fuel at 8 a.m. of the specific file's day.
<br>

**TL;DR**: there's not a specific time of the day at which the minister uploads the file.

# What's a scalable solution?

# Request with one less of the nyquist rate frequency and deduplicate **somehow**

<div align="center"><img src="./images/memes/nyquist.jpeg" width="50%"></div>

# Logstash Filter Plugin - Fingerprint

It's the second plugin used, mentioned above. It simply add a new field to each logstash message with the fingerprint of a field that already exists, using a specific fingerprint algorithm such as **sha256**.
<br>

This field will be crucial for **deduplication**.

# Final logstash file

```
input{
    http_poller{
        urls => {
            prices => {
                url => "https://www.mimit.gov.it/images/exportCSV/prezzo_alle_8.csv"
                method => get
            }
        }
        request_timeout => 60
        schedule => { cron => "0 */11 * * *"} # crontab for each 11 hours
        codec => "csv"
    }
}

filter{
    fingerprint{
        method => "SHA256"
        source => ["event"]
        target => ["hash"]
        key => "taptap"
    }
}

output{
    kafka{
        bootstrap_servers => "kafkaServer:9092"
        topic_id => "prices"
        codec => "json"
        message_key => "%{hash}"
        max_request_size => 10485880
    }
}
```

# Kafka
Kafka is our **Data Streaming** tool. It ensures us that every message will get to destination no matter how bad things can get, and because of this it can surely handle some duplicated messages...

# KRaft
In order not to burden the pipeline too much, by adding for example another managing service like Zookeeper, **KRaft** is used to manage Kafka.

# The whole Kafka "Situation"

<div align="center"><img src="./images/memes/logstash to kafka.jpeg" width="40%"></div>

# Spark
**Spark** is the heart of the pipeline, by adding the logic and execution of a program that uses **pySpark**. Thanks to that, data are:
- cleaned
- deduplicated
- daily enriched with prediction
- every station's dataset gets updated
- regressors retrain for the next day to increase accuracy

# Cleaning and deduplication

As we mentioned before, there's a problem of duplicate messages. In order to solve this we leverage on the simpliest solution: **drop_duplicates**.

It is a spark streaming dataframe function that, using a field name, it removes the rows with the same values: this fits perfectly with the **hash** field that logstash added for us before.

<div align="center"><img src="./images/memes/deduplicate.jpeg" width="70%"></div>

# Prediction

A project goal is to predict the next day's prices for each station. 

To do so, each station has its own **Linear Regressor** in order to capture every price trend. 

After the prediction is added, each **"dataset"** of each station gets updated by adding the new price of the day and all the regressors gets **retrained** in order to get an accuracy improvements day-by-day

# Problems? A lot.

Using a regressor for each station though is a **resource challenge**. 

The "regular" way to do this would be a filter for each station and fuel, add the prediction to the single row and then "union" the row to a new dataframe that at the end would be an exact copy of the original.

<div align="center"><img src="./images/memes/outofmemory.jpeg" width="40%"></div>

# "Solution"

<div aling="center"><img src="./images/memes/pandas.jpeg" width="40%"></div>

# Spark - Pandas - Parquet

In truth the real solution has been offered by the compression of the **Apache parquet** format that can be used in RAM as a backend for the conversion from Spark Streaming Dataframe to Pandas Dataframe

# Elasticsearch and Kibana

Elasticseach is the **storage** of the pipeline, while Kibana is a **tool of visualization**. 

The choice was easy: they both belong to the **Elastic** group (with also Logstash). They're very integrated with each other and makes everything very easy to do.

# The end