# INGESTION + ETL + DB Inquinamento

## Scraper

In [None]:
from datetime import datetime, timedelta
import json

import requests
from bs4 import BeautifulSoup, Tag
from tqdm import tqdm

#Scraper functions

def _get_table_row_values(row:Tag) -> dict:
    """
    Return a dict like: {"pollutant": value, (...)}
    """
    inquinanti = [
        "Biossido di Zolfo",
        "Polveri < 10",
        "Polveri < 2.5",
        "Biossido di Azoto",
        "Monossido di carbonio",
        "Ozono",
        "Benzene"
    ]
    cells = row.select("td")[1:]
    row_value = {}
    for pollutant, cell in zip(inquinanti, cells):
        row_value[pollutant] = cell.text
    return row_value

def get_table_day_values(day: datetime) -> dict | None:
    """
    Return a dict like: {"station_address": {"pollutant": value, (...)}, (...)}
    """

    url = f"https://www.amat-mi.it/index.php?id_sezione=35&data_bollettino={day.year}-{day.month}-{day.day}"
    main_page = requests.get(url)
    main_page = BeautifulSoup(main_page.text)
    table = main_page.select_one(".table")
    if not table:
        return None

    table_values = {}
    stations = [
        "Viale Liguria",
        "Viale Marche",
        "Via Pascal",
        "Via Senato",
        "Verziere"
    ]
    rows = table.select("tr")[2:7]
    for station, row in zip(stations, rows):
        table_values[station] = _get_table_row_values(row)
    return table_values

def get_days_list(start: datetime, end: datetime):
    numdays = (end - start).days
    return [start + timedelta(days=x) for x in range(numdays + 1)]

# Other functions

def pad2(day_or_month: int) -> str:
    response = str(day_or_month)
    if len(response) > 1:
        return response
    return f"0{response}"

days = get_days_list(datetime(2023, 1, 1), datetime(2023, 12, 31))
db = []
pbar = tqdm(total=len(days), desc="Scraping progress...")
for day in days:
    table_value = get_table_day_values(day)
    if table_value:
        db.append({"Date": f"{day.year}-{pad2(day.month)}-{pad2(day.day)}", "Stations": table_value})
    pbar.update(1)

In [None]:
PATH_OUT = "./Data/Raw/Inquinamento/inquinamento.json"
with open(PATH_OUT, "w") as file:
    json.dump(db, file, indent=4)

### Normalizzazione

In [None]:
import pandas as pd
import json

# Load raw files to normalizations
PATH_RELEVATIONS = "./Data/Raw/Inquinamento/inquinamento.json"
PATH_STATIONS = "./Data/Raw/Inquinamento/inquinamento_stazione.csv"
PATH_MOLECULE = "./Data/Raw/Inquinamento/inquinamento_inquinante.csv"

with open(PATH_RELEVATIONS, 'r') as file:
    rilevations:list = json.load(file)
stations = pd.read_csv(PATH_STATIONS)
inquinanti_df = pd.read_csv(PATH_MOLECULE)

# Start operations
bollettino_csv = [["id_bollettino", "data"]]
misurazione_csv = [["id_bollettino", "id_stazione", "id_inquinante", "valore"]]
for i, rilevation in enumerate(rilevations):
    id_bollettino = i + 1
    bollettino_csv.append([id_bollettino, rilevation["Date"]])
    for nome_stazione, inquinanti in rilevation["Stations"].items():
        id_station = stations[stations["nome"] == nome_stazione]["id_stazione"].values[0]
        for nome_inquinante, valore in inquinanti.items():
            id_inquinante = inquinanti_df[inquinanti_df["nome"] == nome_inquinante]["id_inquinante"].values[0]
            misurazione_csv.append([id_bollettino, id_station, id_inquinante, valore])

In [None]:
PATH_BOLLETTINO_OUT = "./Data/Raw/Inquinamento/bollettino.csv"
PATH_MISURAZIONE_OUT = "./Data/Raw/Inquinamento/misurazione.csv"
pd.DataFrame(bollettino_csv).to_csv(PATH_BOLLETTINO_OUT, index=False, header=False)
pd.DataFrame(misurazione_csv).to_csv(PATH_MISURAZIONE_OUT, index=False, header=False)

## ELT

### Pulizia tabella misurazione

In [None]:
import pandas as pd

def lambda_valore(x):
    valore = x.strip("\n").strip("\r").strip().replace(",", ".")
    try:
        valore = float(valore)
    except:
        if "<" in valore:
            valore = 0.0
        else:
            valore = None
    return valore

PATH_MISURAZIONE = "./Data/Raw/Inquinamento/misurazione.csv"

misurazione = pd.read_csv(PATH_MISURAZIONE)

misurazione["valore"] = misurazione["valore"].apply(lambda_valore)
misurazione = misurazione[misurazione["valore"].notna()]

In [None]:
PATH_MISURAZIONE_OUT = "./Data/Clean/Inquinamento/inquinamento_misurazione_clean.csv"
misurazione.to_csv(PATH_MISURAZIONE_OUT, index=False)

### Pulizia tabelle e caricamento in Clean

In [None]:
# Nessuna pulizia necessaria al momento. Carico direttamente.

PATH_BOLLETTINO = "./Data/Raw/Inquinamento/bollettino.csv"
PATH_BOLLETTINO_OUT = "./Data/Clean/Inquinamento/inquinamento_bollettino_clean.csv"
pd.read_csv(PATH_BOLLETTINO).to_csv(PATH_BOLLETTINO_OUT, index=False)

PATH_STAZIONE = "./Data/Raw/Inquinamento/inquinamento_stazione.csv"
PATH_STAZIONE_OUT = "./Data/Clean/Inquinamento/inquinamento_stazione_clean.csv"
pd.read_csv(PATH_STAZIONE).to_csv(PATH_STAZIONE_OUT, index=False)

PATH_INQUINANTE = "./Data/Raw/Inquinamento/inquinamento_inquinante.csv"
PATH_INQUINANTE_OUT = "./Data/Clean/Inquinamento/inquinamento_inquinante_clean.csv"
pd.read_csv(PATH_INQUINANTE).to_csv(PATH_INQUINANTE_OUT, index=False)

## Creation SQLITE DB

In [None]:
import sqlite3

import pandas as pd

def get_schema_table(drop_command:str, create_command:str, insert_command:str, rows :list[tuple]) -> list[str]:

    for row in rows:
        insert_command = insert_command + "("
        for element in row:
            if isinstance(element, str):
                element = "'" + element + "'"
            insert_command = insert_command + f"{element},"
        insert_command = insert_command.strip(",") + "),"
    insert_command = insert_command.strip(",") + ";"

    return [drop_command, create_command, insert_command]

def execute_query_list(queries: list[str]):
    response = []
    with sqlite3.connect(PATH_DB) as connection:
        for query in queries:
            cursor = connection.cursor()
            result = cursor.execute(query)
            response.append(result.fetchall())
            connection.commit()
    return response

# Set file paths
PATH_DB = "./Data/Clean/Inquinamento/inquinamento.db"
PATH_BOLLETTINO_CLEAN = "./Data/Clean/Inquinamento/inquinamento_bollettino_clean.csv"
PATH_STAZIONE_CLEAN = "./Data/Clean/Inquinamento/inquinamento_stazione_clean.csv"
PATH_INQUINANTE_CLEAN = "./Data/Clean/Inquinamento/inquinamento_inquinante_clean.csv"
PATH_MISURAZIONE_CLEAN = "./Data/Clean/Inquinamento/inquinamento_misurazione_clean.csv"
PATH_SOGLIA_CLEAN = "./Data/Clean/Inquinamento/inquinamento_soglia_clean.csv"

tables = {}
# set commands tables
tables["bollettino"] = {
    "commands":{
        "drop": "DROP TABLE IF EXISTS bollettino;",
        "create": """CREATE TABLE bollettino ("id_bollettino" INT PRIMARY KEY,"data" DATE NULL);""",
        "insert": """INSERT INTO bollettino ("id_bollettino","data") VALUES\n"""
    }
}
tables["stazione"] = {
    "commands": {
        "drop": "DROP TABLE IF EXISTS stazione;",
        "create": """
            CREATE TABLE stazione (
                "id_stazione" INT PRIMARY KEY,
                "nome" VARCHAR(255) NULL,
                "latitude" NUMERIC NULL,
                "longitute" NUMERIC NULL
            );
        """,
        "insert":"""INSERT INTO stazione ("id_stazione","nome","latitude","longitute") VALUES\n"""
    }
}
tables["inquinante"] = {
    "commands": {
        "drop": "DROP TABLE IF EXISTS inquinante;",
        "create": """
            CREATE TABLE inquinante (
                "id_inquinante" INT PRIMARY KEY,
                "nome" VARCHAR(255) NULL,
                "simbolo" VARCHAR(255) NULL,
                "unita_di_misura" VARCHAR(255) NULL,
                "media_temporale" VARCHAR(255) NULL
            );
        """,
        "insert":"""INSERT INTO inquinante ("id_inquinante","nome","simbolo","unita_di_misura","media_temporale") VALUES\n"""
    }
}
tables["misurazione"] = {
    "commands": {
        "drop": """DROP TABLE IF EXISTS misurazione;""",
        "create": """
            CREATE TABLE misurazione (
                "id_bollettino" INT NOT NULL,
                "id_stazione" INT NOT NULL,
                "id_inquinante" INT NOT NULL,
                "valore" NUMERIC NULL,
                FOREIGN KEY(id_bollettino) REFERENCES bollettino(id_bollettino),
                FOREIGN KEY(id_stazione) REFERENCES stazione(id_stazione),
                FOREIGN KEY(id_inquinante) REFERENCES inquinante(id_inquinante)
            );
        """,
        "insert": """INSERT INTO misurazione ("id_bollettino","id_stazione","id_inquinante","valore") VALUES\n"""
    }
}
tables["soglia"] = {
    "commands": {
        "drop": """DROP TABLE IF EXISTS soglia;""",
        "create": """
            CREATE TABLE soglia (
                "id_inquinante" INT NOT NULL,
                "tipo_soglia" VARCHAR(255) NULL,
                "valore" INT NULL,
                "unita_di_misura" VARCHAR(255) NULL,
                "periodo_di_riferimento" VARCHAR(255) NULL,
                "max_superamenti_anno" INT NULL
            );
        """,
        "insert": """INSERT INTO soglia (
                        "id_inquinante",
                        "tipo_soglia",
                        "valore",
                        "unita_di_misura",
                        "periodo_di_riferimento",
                        "max_superamenti_anno"
                    ) VALUES\n"""
    }
}

# set rows from df
tables["bollettino"]["rows"] = list(pd.read_csv(PATH_BOLLETTINO_CLEAN).itertuples(index=False, name=None))
tables["stazione"]["rows"] = list(pd.read_csv(PATH_STAZIONE_CLEAN).itertuples(index=False, name=None))
tables["inquinante"]["rows"] = list(pd.read_csv(PATH_INQUINANTE_CLEAN).itertuples(index=False, name=None))
tables["misurazione"]["rows"] = list(pd.read_csv(PATH_MISURAZIONE_CLEAN).itertuples(index=False, name=None))
tables["soglia"]["rows"] = list(pd.read_csv(PATH_SOGLIA_CLEAN).itertuples(index=False, name=None))

# Create schema query
total_query = []
for v in tables.values():
    total_query.extend(get_schema_table(v["commands"]["drop"], v["commands"]["create"], v["commands"]["insert"], v["rows"]))

# Execute query
query_result = execute_query_list(total_query)

# Nuovo Inquinamento

## Scraping

Otteniamo gli id delle stazioni di Milano

In [16]:
import requests

URL = "https://airnet.waqi.info/airnet/validator/check/112273"
risposta = requests.get(URL)
risposta_json = risposta.json()
neighbors = risposta_json["data"]["neighbors"]

inizializziamo il geojson con id e coordinate delle stazioni

In [None]:
geojson = {
    "type": "FeatureCollection",
    "name": "Inquinamento",
    "crs": {
        "type": "name",
        "properties": {
            "name": "urn:ogc:def:crs:EPSG::6707"
        }
    },
    "features": []
}
for geo, id in zip(neighbors["geos"], neighbors["ids"]):
    if not id == 353767: # Stazione marco emme, Milan, Italy che non contiene dati pm10 e pm25
        geojson["features"].append(
            {
                "type": "Feature",
                "properties": {
                    "id": id,
                    "longitudine": geo[1],
                    "latitudine": geo[0],
                    "name": "",
                    "pm10": [],
                    "pm25": []
                },
                "geometry": {
                    "type": "Point",
                    "coordinates": [geo[1], geo[0]]
                }
            }
        )

Estraiamo i dati di ogni stazione (per i 2 inquinanti) e li inseriamo direttamente tra le "features" >> "properties" del geojson inizializzato

In [18]:
import json

from sseclient import SSEClient
from tqdm import tqdm

pbar = tqdm(total=len(geojson["features"]), desc="Scraping progress...")

for feature in geojson["features"]:
    for inquinante in ["pm10", "pm25"]:
        url = f"https://airnet.waqi.info/airnet/sse/historic/daily/{feature["properties"]["id"]}?specie={inquinante}"
        response = requests.get(url, params={"specie": inquinante}, headers={"Accept": "text/event-stream"}, stream=True)
        client = SSEClient(response)
        for i, event in enumerate(client.events()):
            if event.event != "error":
                try:
                    data = json.loads(event.data)
                    if i == 0:
                        feature["properties"]["name"] = data["meta"]["name"]
                    elif isinstance(data, dict): #Ogni tanto ci sono righe sporche di stringhe
                        feature["properties"][inquinante].append(data)
                except json.JSONDecodeError:
                    print("Errore JSON:", event.data)
    pbar.update(1)

Scraping progress...:   6%|▌         | 22/365 [00:32<08:28,  1.48s/it]
Scraping progress...: 100%|██████████| 19/19 [00:57<00:00,  3.22s/it]

salvo il file geojson creato in Clean

In [None]:
PATH_INQUINAMENTO_GEOJSON_STAGING = "./Data/Raw/Inquinamento/inquinamento_staging.geojson"
with open(PATH_INQUINAMENTO_GEOJSON_STAGING, 'w') as file:
    json.dump(geojson, file, indent=4)

## Elaborazione dati

In [None]:
import json

import pandas as pd

PATH_INQUINAMENTO_GEOJSON_STAGING = "./Data/Raw/Inquinamento/inquinamento_staging.geojson"
with open(PATH_INQUINAMENTO_GEOJSON_STAGING, 'r') as file:
    geojson = json.load(file)
features = geojson["features"]

for feature in geojson["features"]:
    for data in ["pm10", "pm25"]:
        df = pd.DataFrame(feature["properties"][data])
        df['day'] = pd.to_datetime(df['day'])
        filtered_df = df[df['day'].dt.year.isin([2023])].reset_index()
        if len(filtered_df) > 200:
            media_median = filtered_df["median"].mean()
            feature["properties"][data] = media_median
        else:
            feature["properties"][data] = None

In [None]:
PATH_INQUINAMENTO_GEOJSON_CLEAN = "./Data/Clean/Inquinamento/inquinamento.geojson"
with open(PATH_INQUINAMENTO_GEOJSON_CLEAN, 'w') as file:
    json.dump(geojson, file, indent=4)