In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .master("local") \
    .appName("pantheon") \
    .config('spark.jars.packages','org.mongodb.spark:mongo-spark-connector_2.11:2.4.0,org.postgresql:postgresql:42.1.1') \
    .config("spark.mongodb.input.uri","mongodb://root:mongodb@mongodb/pantheon.station?authSource=admin")\
    .getOrCreate()

## Recupero dei dati da MongoDB
Prima di leggere i dati bisogna caricare i dataset MongoDB. Eseguire lo script 'saveDataset.sh'

In [2]:
MONGODB_URI='mongodb://mongodb/'
def getCollection(sparksession):
    df = sparksession.read.format("com.mongodb.spark.sql.DefaultSource").load()
    return df

In [3]:
stazioneDf = getCollection(spark)

In [4]:
stazioneDf.printSchema()

root
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- data_ora: string (nullable = true)
 |-- id_dato: integer (nullable = true)
 |-- pioggia_mm: double (nullable = true)
 |-- pressione_mbar: double (nullable = true)
 |-- pressione_n_letture: integer (nullable = true)
 |-- pressione_standard_mbar: double (nullable = true)
 |-- rad W/mq: double (nullable = true)
 |-- rad W/mq array: string (nullable = true)
 |-- rad_n_letture: double (nullable = true)
 |-- temp1_max: double (nullable = true)
 |-- temp1_media: double (nullable = true)
 |-- temp1_min: double (nullable = true)
 |-- temp1_ur1_n_letture: integer (nullable = true)
 |-- ur1_max: double (nullable = true)
 |-- ur1_media: double (nullable = true)
 |-- ur1_min: double (nullable = true)
 |-- wind_dir: integer (nullable = true)
 |-- wind_dir_n_letture: integer (nullable = true)
 |-- wind_speed_max: double (nullable = true)
 |-- wind_speed_media: double (nullable = true)
 |-- wind_speed_n_letture: doubl

In [5]:
# con questa opzione della Spark Session impostata a True, non c'è più bisogno di usare show per vedere i dataframe;
# inoltre si possono vedere in modo non sballato
spark.conf.set("spark.sql.repl.eagerEval.enabled",True)

In [6]:
stazioneDf

_id,data_ora,id_dato,pioggia_mm,pressione_mbar,pressione_n_letture,pressione_standard_mbar,rad W/mq,rad W/mq array,rad_n_letture,temp1_max,temp1_media,temp1_min,temp1_ur1_n_letture,ur1_max,ur1_media,ur1_min,wind_dir,wind_dir_n_letture,wind_speed_max,wind_speed_media,wind_speed_n_letture
[5d34244f5bb1bc30...,2018-10-12 14:30:12,1811269,0.0,1087.05,5,1119.79,104.1,{},5.0,26.03,25.81,25.35,5,61.32,57.25,49.95,6,5,5.41,0.91,75.0
[5d34244f5bb1bc30...,2018-10-12 14:25:15,1811253,0.0,1085.94,5,1118.68,75.0,{},5.0,26.08,25.99,25.93,5,58.97,56.69,52.56,6,5,15.62,0.3,25.0
[5d34244f5bb1bc30...,2018-10-12 14:40:13,1811284,0.0,1088.63,5,1121.37,247.2,{},5.0,23.89,23.82,23.76,5,54.91,54.17,53.44,44,5,8.19,2.37,196.0
[5d34244f5bb1bc30...,2018-10-12 14:50:10,1811314,0.0,1086.41,5,1119.15,250.3,{},5.0,23.92,23.84,23.76,5,58.4,57.18,55.32,68,5,3.41,1.3,108.0
[5d34244f5bb1bc30...,2018-10-12 14:45:13,1811304,0.0,1086.11,5,1118.85,206.6,{},5.0,23.86,23.74,23.66,5,56.69,55.22,53.6,72,5,5.7,1.93,160.0
[5d34244f5bb1bc30...,2018-10-12 14:55:13,1811315,0.0,1086.35,5,1119.09,307.5,{},5.0,23.97,23.92,23.84,5,56.99,56.15,55.42,42,5,8.1,2.88,239.0
[5d34244f5bb1bc30...,2018-10-12 15:00:15,1811334,0.0,1086.35,5,1119.09,360.5,{},5.0,24.71,24.45,24.1,5,57.76,54.74,52.34,72,5,6.32,2.63,218.0
[5d34244f5bb1bc30...,2018-10-12 15:05:11,1811345,0.0,1086.29,5,1119.03,307.5,{},5.0,24.43,24.37,24.3,5,54.93,54.01,53.14,53,5,7.23,2.91,241.0
[5d34244f5bb1bc30...,2018-10-12 14:35:11,1811283,0.0,1085.7,5,1118.44,234.4,{},5.0,24.98,24.34,23.88,5,54.04,52.91,51.54,52,5,9.43,3.89,322.0
[5d34244f5bb1bc30...,2018-10-12 15:10:13,1811346,0.0,1086.29,5,1119.03,388.4,{},5.0,24.86,24.71,24.58,5,54.2,52.08,48.88,69,5,8.12,3.13,259.0


## Rilevamento di anomalie
Funzione che data una colonna del dataset come parametro, riporta i blocchi di record consecutivi con valori nulli e non nulli. Utile per capire anomalie o eventuali guasti ai sensori

In [28]:
# questa cella serve ad individuare i blocchi del dataframe accomunati da una caratteristica comune, ad esempio in questo caso
# vogliamo raggruppare tutti i blocchi con radiazione nulla e tutti quelli con radiazione non nulla
# la funzione lag serve , data una certa riga , a selezionare la riga che si trova ad un certo offset da questa, nel nostro caso 1
# usiamo lag per selezionare la riga subito sotto alla riga di inizio di un blocco 
# coalesce serve per selezionare, data una lista di colonne, la prima colonna non nulla
# count considera nel conteggio solo le celle non vuote, così può essere utile per individuare una sottocolonna con tutti valori non nulli
import pyspark.sql.functions as F
from pyspark.sql.window import Window

def anomalies_detection(column_name):
    nullRadSlot = stazioneDf
    nullRadSlot = nullRadSlot.withColumn("isnull", F.when(nullRadSlot[column_name].isNull(), True).otherwise(False))
    nullRadSlot = nullRadSlot.withColumn("lag_isnull", F.lag(nullRadSlot["isnull"],1).over(Window.orderBy(nullRadSlot["data_ora"])))
    nullRadSlot = nullRadSlot.withColumn("change", F.coalesce(nullRadSlot["isnull"]!=nullRadSlot["lag_isnull"],F.lit(False)))
    nullRadSlot = nullRadSlot.withColumn("block", F.sum(nullRadSlot["change"].cast("int")).over(Window.orderBy(nullRadSlot["data_ora"])))\
      .groupBy("block")\
      .agg(F.min(nullRadSlot["data_ora"]).alias('mindata'),
        F.max(nullRadSlot["data_ora"]).alias('maxdata'),
        (F.count(nullRadSlot[column_name])==0).alias('blocco_isnull'))

    timeFmt = "yyyy-MM-dd HH:mm:ss"
    nullRadSlot = nullRadSlot.withColumn("Duration", F.unix_timestamp('maxdata', format=timeFmt)\
                                         - F.unix_timestamp('mindata', format=timeFmt))

    nullRadSlot = nullRadSlot.withColumn("Duration",nullRadSlot.Duration/(3600*24))
    
    return nullRadSlot

In [31]:
df_anomaly = anomalies_detection('rad W/mq')

df_anomaly

block,mindata,maxdata,blocco_isnull,Duration
0,2018-10-12 14:25:15,2019-05-06 08:55:28,False,205.7709837962963
1,2019-05-06 13:40:10,2019-06-12 09:25:15,True,36.82297453703704


In [9]:
# elimina i blocchi che non sono True
df_anomaly = df_anomaly.select('mindata','maxdata','Duration').filter(df_anomaly['blocco_isnull']==True)

## Calcolo Report settimanale
Funzione che calcola per ogni settimana di un dato anno le temperature medie, le temperature massime (
massime, minime e medie), le temperature minime (massime, minime e medie), i millimetri di pioggia totali caduti durante la settimana e il numero di giorni della settimana in cui ha piovuto

In [17]:
from pyspark.sql.functions import year, to_date, col, dayofyear, sum, avg, count, min, max
# analisi che è meglio fare all'interno di un solo anno, perciò questa funzione prende in input l'anno da analizzare
def weekly_report(anno):
    
    df3 = stazioneDf.filter(year(stazioneDf.data_ora) == anno)
        # raggruppamento per giorni
    df3 = df3.withColumn("Data", to_date(col("data_ora")))\
        .select(dayofyear('data_ora').alias("Giorno_Anno"),\
                "Data",\
                "pioggia_mm",\
                "temp1_media",\
                "temp1_max","temp1_max","temp1_max",\
                "temp1_min","temp1_min","temp1_min",\
                "data_ora")\
        .groupBy("Giorno_Anno","Data")\
        .agg(sum("pioggia_mm").alias("Pioggia_Totale"),\
             sum("temp1_media").alias("Temperatura_Media_Totale"),\
             max("temp1_max").alias("Massima_Temperatura_Massima"),\
             min("temp1_max").alias("Minima_Temperatura_Massima"),\
             sum("temp1_max").alias("Temperatura_Massima_Totale"),\
             max("temp1_min").alias("Massima_Temperatura_Minima"),\
             min("temp1_min").alias("Minima_Temperatura_Minima"),\
             sum("temp1_min").alias("Temperatura_Minima_Totale"),\
             count("data_ora").alias("Conteggio_Record_Originali"))\
        .orderBy("Data")

    # se in un giorno hanno piovuto più di 2mm metto 1 (True), altrimenti 0 (False)
    df3 = df3.withColumn("Ha_Piovuto", F.when(df3["Pioggia_Totale"] >= 2.0, 1).otherwise(0))

    # determina la settimana divideno per 7, poi arrotonda e converte il valore in int
    df3 = df3.withColumn("Settimana", F.round(df3["Giorno_Anno"]/7).cast("int"))
    # modifica di una colonna già esistente
    df3 = df3.withColumn("Settimana", df3["Settimana"] + 1)
    
    # raggruppamento per settimane
    df3 = df3.select("Settimana",\
                     "Pioggia_Totale",\
                     "Ha_Piovuto",\
                     "Temperatura_Media_Totale",\
                     "Massima_Temperatura_Massima",\
                     "Minima_Temperatura_Massima",\
                     "Temperatura_Massima_Totale",\
                     "Massima_Temperatura_Minima",\
                     "Minima_Temperatura_Minima",\
                     "Temperatura_Minima_Totale",\
                     "Conteggio_Record_Originali")\
        .groupBy("Settimana")\
        .agg(sum("Pioggia_Totale").alias("Pioggia_Totale"),\
             sum("Ha_Piovuto").alias("Giorni_di_Pioggia"),\
             sum("Temperatura_Media_Totale").alias("Temperatura_Media_Totale"),\
             max("Massima_Temperatura_Massima").alias("Massima_Temperatura_Massima"),\
             min("Minima_Temperatura_Massima").alias("Minima_Temperatura_Massima"),\
             sum("Temperatura_Massima_Totale").alias("Temperatura_Massima_Totale"),\
             max("Massima_Temperatura_Minima").alias("Massima_Temperatura_Minima"),\
             min("Minima_Temperatura_Minima").alias("Minima_Temperatura_Minima"),\
             sum("Temperatura_Minima_Totale").alias("Temperatura_Minima_Totale"),\
             sum("Conteggio_Record_Originali").alias("Conteggio_Record_Originali"))\
        .orderBy("Settimana")

    # calcolo delle medie settimanali di temperatura media, massima e minima
    df3 = df3.withColumn("Temperatura_Media_Settimanale",\
                         (df3["Temperatura_Media_Totale"])/(df3["Conteggio_Record_Originali"]))
    df3 = df3.withColumn("Temperatura_Massima_Media",\
                         (df3["Temperatura_Massima_Totale"])/(df3["Conteggio_Record_Originali"]))
    df3 = df3.withColumn("Temperatura_Minima_Media",\
                         (df3["Temperatura_Minima_Totale"])/(df3["Conteggio_Record_Originali"]))

    df3 = df3.select("Settimana",\
                     "Pioggia_Totale","Giorni_di_Pioggia",\
                    "Temperatura_Media_Settimanale",\
                    "Massima_Temperatura_Massima",\
                     "Minima_Temperatura_Massima",\
                     "Temperatura_Massima_Media",\
                    "Massima_Temperatura_Minima",\
                    "Minima_Temperatura_Minima",\
                    "Temperatura_Minima_Media")
    
    print("Analisi delle settimane nell'anno",anno)
    return df3

In [18]:
df_weekly = weekly_report(2019)

df_weekly

Analisi delle settimane nell'anno 2019


Settimana,Pioggia_Totale,Giorni_di_Pioggia,Temperatura_Media_Settimanale,Massima_Temperatura_Massima,Minima_Temperatura_Massima,Temperatura_Massima_Media,Massima_Temperatura_Minima,Minima_Temperatura_Minima,Temperatura_Minima_Media
1,0.0,0,4.924027777777777,12.98,-1.14,5.174409722222221,12.46,-1.7,4.682673611111111
2,1.44,0,3.393630952380952,11.45,-4.75,3.6338988095238096,10.82,-5.32,3.1701636904761896
3,2.24,1,4.929493293591654,14.0,-1.98,5.20703427719821,13.64,-2.42,4.668360655737704
4,15.520000000000008,4,4.232757575757576,12.44,-1.12,4.382757575757576,11.62,-1.33,4.098439393939394
5,4.400000000000001,0,3.5415994020926758,11.46,-3.73,3.74593423019432,9.98,-4.14,3.351315396113603
6,16.719999999999995,3,8.05752976190476,15.54,-0.86,8.2403125,14.2,-1.4,7.887693452380951
7,0.0,0,7.591895522388061,18.12,-1.62,7.865940298507463,17.58,-2.1,7.328223880597016
8,0.08,0,7.32372197309417,17.35,-1.02,7.6161285500747375,16.67,-1.38,7.046188340807174
9,0.04,0,8.48891207153502,20.42,0.59,8.776557377049182,19.56,-0.3,8.213368107302534
10,0.16,0,10.170238095238094,19.48,1.96,10.420416666666666,18.24,1.27,9.935357142857145


## Connessione a postgresql
Prima di caricare su Postgres bisogna creare il database 'report' accedendo con adminer su http://127.0.0.1:8085

In [12]:
driver = "org.sqlite.JDBC"
url="jdbc:postgresql://postgres/report"
# tablename = "prova"
properties = {
    "driver": "org.postgresql.Driver",
    "user": "postgres",
    "password": "password"
}
# dbDataFrame = spark.read.jdbc(url=url,table=tablename,properties=properties)

# dbDataFrame.show()

In [13]:
# carica su postgres il dataframe output del job Anomalies_Detection

mode = 'overwrite'
df_anomaly.write.jdbc(url=url, table="guasti", mode=mode, properties=properties)

In [14]:
# carica su postgres il dataframe output del job Weekly_Report
mode = 'overwrite'
df_weekly.write.jdbc(url=url, table="report_settimanali", mode=mode, properties=properties)