Usa le API per scaricare da Open Data Lombardia le informazioni sulle posizioni e il tipo dei sensori sia per il meteo che per la qualità dell'aria.

Crea le tabelle e le salva nel percorso utilizzato poi da NiFi per eseguire le varie fasi di enrichment.
In particolare coompone anche le due tabelle andando ad associare ad ogni postazione della qualità dell'aria, la postazione meteo più vicina.



Api key:

In [2]:
api_aria = "ib47-atvt"
api_meteo = "nf78-nj6b"

Preparazione Client e librerie

In [3]:
!pip install sodapy
!pip install geopy 

import pandas as pd
from sodapy import Socrata
from geopy.distance import geodesic

# Authenticated client (needed for non-public datasets):
client = Socrata("www.dati.lombardia.it",
                 "OgNGi2gJVq7zGzRRdpCPmK3HM",
                  username="42dr396@gmail.com",
                  password="qLGKdDrdjk.3.SG")


Collecting sodapy
  Downloading https://files.pythonhosted.org/packages/9e/74/95fb7d45bbe7f1de43caac45d7dd4807ef1e15881564a00eef489a3bb5c6/sodapy-2.1.0-py2.py3-none-any.whl
Installing collected packages: sodapy
Successfully installed sodapy-2.1.0


## Aria



Per prima cosa si sono selezionati i sensori in modo che fossero adatti all'analisi presa in considerazione. 
In particolare si sono scelti i sensori non 'storici', ovvero solo quelli che sono tutt'ora attivi, visto che l'analisi si focalizza sui dati del 2020. Ma allo stesso modo, visto che si vogliono confrontare i risultati con lo storico degli ultimi 5 anni si sono selezionati i sensori in base alla 'datastart', ovvero in modo che fossero attivi per tutto il periodo selezionato.

La selezione è avvenuta tramite una query nel linguaggio SoQL 


In [4]:
# query in SoQL per ottenere solo i dati necessari
query = """
SELECT *
where datastart < '2014-01-01T00:00:00.000' and storico = 'N'
"""

# L'API restituisce valori in Json / sodapy li converte poi in liste di dizionari in python.

results = client.get(api_aria,query=query)

# Conversione in DataFrame 
aria_df = pd.DataFrame.from_records(results)

In [8]:
#aria_df.head(5)

Unnamed: 0,idsensore,nometiposensore,unitamisura,idstazione,nomestazione,quota,provincia,comune,storico,datastart,utm_nord,utm_est,lat,lng,location
0,12577,Particelle sospese PM2.5,µg/m³,627,Cremona - p.zza Cadorna,39,CR,Cremona,N,2012-01-01T00:00:00.000,4998110,579872,45.13194691285173,10.015741513336042,"{'latitude': '45.13194691285173', 'longitude':..."
1,5572,Biossido di Azoto,µg/m³,569,Sondrio - via Mazzini,307,SO,Sondrio,N,1993-11-01T00:00:00.000,5113078,567173,46.16796681227828,9.87014407497457,"{'latitude': '46.16796681227828', 'longitude':..."
2,5631,Biossido di Zolfo,µg/m³,546,Magenta,137,MI,Magenta,N,1995-04-22T00:00:00.000,5034328,490635,45.462415791106615,8.880210433125571,"{'latitude': '45.462415791106615', 'longitude'..."
3,5965,Biossido di Azoto,µg/m³,633,Soresina,70,CR,Soresina,N,1999-10-16T00:00:00.000,5014860,567712,45.28397715446764,9.863398419993636,"{'latitude': '45.28397715446764', 'longitude':..."
4,12610,Piombo,ng/m³,546,Magenta,137,MI,Magenta,N,2008-04-15T00:00:00.000,5034328,490635,45.462415791106615,8.880210433125571,"{'latitude': '45.462415791106615', 'longitude'..."


## Selezione stazioni e inquinanti


Dopo aver selezionato i sensori in base al periodo di attività, si procede con la selezione del tipo di sensore (Inquinante misurato) e di conseguenza delle stazioni.

In [None]:
# distribuzione sensori per tipo
aria_df.nometiposensore.value_counts()

Biossido di Azoto           84
Ossidi di Azoto             83
PM10 (SM2005)               63
Ozono                       51
Monossido di Carbonio       44
Biossido di Zolfo           32
Particelle sospese PM2.5    29
Benzene                     20
Cadmio                      14
Arsenico                    14
Nikel                       14
Benzo(a)pirene              14
Piombo                      14
Ammoniaca                   10
BlackCarbon                  2
Monossido di Azoto           1
Name: nometiposensore, dtype: int64

Si può notare come non ci sia uniformità nella distribuzione dei sensori e come molti inquinanti siano misurati raramente. Integrando le informazioni sugli inquinanti più importanti, espresse nelle normative sull'inquinamento e discusse nel report, con il numero di esemplari per tipo di sensore, si è giunti alla conclusione che:


Gli inquinanti da tenere sono:
    - Biossido d'azoto
    - PM10
    - Monossido di carbonio


In [None]:
inquinanti = ['Biossido di Azoto','PM10 (SM2005)','Monossido di Carbonio']

print(aria_df.idsensore.count()) 
filtered = aria_df[aria_df.nometiposensore.isin(inquinanti)] # selezioni inquinanti
print(filtered.idsensore.count())


489
191


Dopo aver ridotto i sensori ai tipi selezionati si procede a verificare quante delle stazioni ha il tris completo di sensori, visto che il monossido di carbonio è misurato in un numero di casi inferiore.


In [None]:
nsensori = filtered.groupby("idstazione",as_index=False)['nometiposensore'].count() # numero di sensori per ogni stazioni

In [None]:
complete_stations = list(nsensori[nsensori.nometiposensore==3].idstazione) # lista delle stazioni "complete", ovvero che hanno tutti e 3 i sensori selezionati

In [None]:
aria_df = filtered[filtered.idstazione.isin(complete_stations)] # selezione delle staizioni prescelte
print(aria_df.idsensore.count())

102


In [None]:
aria_df.groupby('idstazione').count().count()

idsensore          34
nometiposensore    34
unitamisura        34
nomestazione       34
quota              34
provincia          34
comune             34
storico            34
datastart          34
utm_nord           34
utm_est            34
lat                34
lng                34
location           34
dtype: int64

Alla fine del processo i sensori sono significativamente meno, ma permettono di fare una analisi completa visto che funzionano per tutto il periodo selezionato. Inoltre ogni zona geografica, individuata da una postazione, presenta gli stessi tre tipi di sensori.

Salva File Aria


In [None]:
# uncomment se si è sulla VM 
# aria_df.to_csv('/data/my-data/sensori_aria')

## Meteo



Per il meteo si procede nello stesso modo della qualità dell'aria, se non che si aggiunge un filtro direttamente in fase di richiesta API, sapendo a priori di volere solo i valori di Precipitazione.

In [None]:
query = """
SELECT *
where tipologia ='Precipitazione' and datastart < '2014-01-01T00:00:00.000' and storico = 'N'
"""
results = client.get(api_meteo,query=query)

# Conversione in DataFrame 
meteo_df = pd.DataFrame.from_records(results)


In [None]:
#meteo_df.head()

## Join


Per Join si intende la creazione di una tabella ponte tra la tabella della qualità dell'aria e del meteo. In particolare si tratta di una integrazione geografica in cui si va a valutare per ogni stazione dell'aria considerata, la stazione meteo più vicina associando all' *idstazione* aria l'*idstazione* corrispondente.
Questa tabella servirà nel flow di Nifi per il LookUp con Hbase.

In [None]:
# Si raggruppano i record delle due tabelle in base all'idstazione, aggregando poi
# con la moda, un trucco per avere solo stazione e coordinate associate

meteo_stations = meteo_df.groupby('idstazione')[['lat','lng']].agg(pd.Series.mode) # meteo
aria_stations = aria_df.groupby('idstazione')[['lat','lng']].agg(pd.Series.mode)   # aria

I risultati sono due tabelle con solo le info sulla posizione delle stazioni 

A questo punto viene aggiunta una colonna con l'attributo 'position', che esprime la posizione nel formato (lat,lng) , per facilitare il calcolo delle distanze


In [None]:
# aria
aria_stations['position']=list(zip(aria_stations.lat, aria_stations.lng))
aria_stations.drop(['lat','lng'],axis=1,inplace=True) # drop lat e lng
# meteo
meteo_stations['position']=list(zip(meteo_stations.lat, meteo_stations.lng))
meteo_stations.drop(['lat','lng'],axis=1,inplace=True) # drop lat e lng

In [None]:
def nearest (row):
# funzione che presa una riga del dataset restituisce la stazione meteo con minore distanza geodetica
    return meteo_stations.applymap(lambda x: geodesic(x,row).kilometers).idxmin()['position']

In [None]:
# qui avviene l'effettivo calcolo e viene associata ad ogni stazione dell'aria la stazione meteo più vicina
aria_stations['meteo_station'] = aria_stations.applymap(lambda x : nearest(x)) 

In [None]:
# drop coordinate gps
aria_stations.drop('position',axis=1,inplace= True)

In [None]:
aria_stations=aria_stations[aria_stations['meteo_station']!='1422'] # stazione problematica eliminata

Salva File BridgeTable



In [None]:
# uncomment se si è sulla VM 
#aria_stations.to_csv('/data/my-data/match_stazioni')

Si è aspettato a salvare le info sulle stazioni meteo, così da sfruttare il fatto che l'associazione tra stazioni aria e meteo per l'enrichment evidenzia il sottoinsieme ridotto di stazioni meteo che vengono effettivamente usate. In questo modo tramite il LookUp con una tabella ridotta, si avranno solo le misure utili come flowfile in Nifi

In [None]:
ok_meteo = list(aria_stations.meteo_station) # lista delle stazioni meteo che compaiono come stazione più vicina ad lameno una stazione aria
meteo_df = meteo_df[meteo_df.idstazione.isin(ok_meteo)] # df meteo filtrato secondo la lista appena descritta
#30 sensori! invece di 182

Salva File Meteo

In [None]:
# uncomment se si è sulla VM 
# meteo_df.to_csv('/data/my-data/sensori_meteo')