In [1]:
import findspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, year, month, dayofmonth, hour, minute, concat, lit, to_date
from pyspark.sql.types import TimestampType
import os
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans

In [13]:
def init_spark():
    findspark.init()
    return SparkSession.builder.appName("MergeDatasets").getOrCreate()

def load_dataset(spark):
    # spark = init_spark()
    # Carica i dataset mensili
    path = "assets/data/dataset_completo"
    folders = ["QCLCD201301", "QCLCD201302", "QCLCD201303", "QCLCD201304",
               "QCLCD201305", "QCLCD201306", "QCLCD201307", "QCLCD201308",
               "QCLCD201309", "QCLCD201310", "QCLCD201311", "QCLCD201312"]

    # Carica ciascun dataset in un DataFrame separato
    dataframes = [spark.read.csv(os.path.join(path, folder, folder[5:] + "hourlyM.csv"), header=True, inferSchema=True)
                  for folder in folders]

    # Unisci i DataFrame in uno
    merged_df = dataframes[0]
    for df in dataframes[1:]:
        merged_df = merged_df.union(df)
    merged_df.show()    
    return merged_df

In [14]:
spark = init_spark()
merged_df = load_dataset(spark)

+----+----------+----+---------+---------------+--------------+----------------+----------------+---------------+----------+--------------+-------------+---------+-----------+--------------------+
|WBAN|      Date|Time|Altimeter|DewPointCelsius|DryBulbCelsius|RelativeHumidity|SeaLevelPressure|StationPressure|Visibility|WetBulbCelsius|WindDirection|WindSpeed|WeatherType|        SkyCondition|
+----+----------+----+---------+---------------+--------------+----------------+----------------+---------------+----------+--------------+-------------+---------+-----------+--------------------+
|3011|01/01/2013|   0|  1013.55|           null|          null|            null|            null|         722.32|     13.68|          null|        120.0|    8.855|       null|                 CLR|
|3011|01/01/2013|   1|  1013.89|           null|          null|            null|            null|         722.32|     16.09|          null|        100.0|     4.83|       null|                 CLR|
|3011|01/01/201

In [18]:
merged_df.columns

['WBAN',
 'Date',
 'Time',
 'Altimeter',
 'DewPointCelsius',
 'DryBulbCelsius',
 'RelativeHumidity',
 'SeaLevelPressure',
 'StationPressure',
 'Visibility',
 'WetBulbCelsius',
 'WindDirection',
 'WindSpeed',
 'WeatherType',
 'SkyCondition']

In [20]:
merged_df.count()

16867047

In [21]:
# Elimina tutte le righe con valori null nella colonna specifica (ad esempio, "NomeColonna")
nome_colonna = "DewPointCelsius"
df_senza_null = merged_df.na.drop(subset=["DewPointCelsius", "DryBulbCelsius", "RelativeHumidity", "Visibility", "WetBulbCelsius", "WindDirection", "WindSpeed"])
print(df_senza_null.count())
# Visualizza il DataFrame risultante
df_senza_null.show()


13890315
+----+----------+----+---------+---------------+--------------+----------------+----------------+---------------+----------+--------------+-------------+---------+-----------+--------------------+
|WBAN|      Date|Time|Altimeter|DewPointCelsius|DryBulbCelsius|RelativeHumidity|SeaLevelPressure|StationPressure|Visibility|WetBulbCelsius|WindDirection|WindSpeed|WeatherType|        SkyCondition|
+----+----------+----+---------+---------------+--------------+----------------+----------------+---------------+----------+--------------+-------------+---------+-----------+--------------------+
|3011|01/01/2013|   3|  1014.56|          -17.5|         -16.0|            89.0|            null|         722.99|     16.09|         -16.4|        110.0|     4.83|       null|                 CLR|
|3011|01/01/2013|   4|   1014.9|          -18.0|         -17.0|            95.0|            null|         723.33|     11.27|         -17.3|        100.0|     4.83|        -SN|              SCT028|
|3011|

In [22]:
# Importa le librerie necessarie
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg
# Esegui una groupby per il campo "wban" e calcola le medie delle misure
# result = df_senza_null.groupBy("wban").agg(avg("DewPointCelsius").alias("DewPointCelsius"), avg("DryBulbCelsius").alias("DryBulbCelsius"))

colonne_da_mediare = ["DewPointCelsius", "DryBulbCelsius", "RelativeHumidity", "Visibility", "WetBulbCelsius", "WindDirection", "WindSpeed"]
agg_exprs = [avg(col(colonna)).alias(colonna) for colonna in colonne_da_mediare]
# Esegui la groupBy e calcola la media
result = df_senza_null.groupBy("wban").agg(*agg_exprs)
print(result.count())
print(result.columns)
# Visualizza i risultati
result.show()
# Chiudi la sessione Spark


1758
['wban', 'DewPointCelsius', 'DryBulbCelsius', 'RelativeHumidity', 'Visibility', 'WetBulbCelsius', 'WindDirection', 'WindSpeed']
+----+--------------------+------------------+------------------+------------------+------------------+------------------+------------------+
|wban|     DewPointCelsius|    DryBulbCelsius|  RelativeHumidity|        Visibility|    WetBulbCelsius|     WindDirection|         WindSpeed|
+----+--------------------+------------------+------------------+------------------+------------------+------------------+------------------+
|3749|  4.9041290675416995|12.499083948591743| 63.47320207820618|13.748165162701724| 8.967692097347555|233.38255400601585|13.367701668033908|
|3918|               7.812| 16.03709090909091| 60.38909090909091|14.945768181818265| 11.56297272727273|113.29818181818182| 6.010627272727268|
|3997|  5.6112037793071305|12.046798087017384| 67.83296395660795| 14.15039251137298| 8.764749795870754|171.22535868424123| 13.22342120611222|
|3179|   7.5128

In [23]:
# Seleziona le colonne rilevanti per la clusterizzazione (ad esempio, temperatura e umidità)
feature_columns = ["DewPointCelsius", "DryBulbCelsius", "RelativeHumidity", "Visibility", "WetBulbCelsius", "WindDirection", "WindSpeed"]
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
df = assembler.transform(result)

# Crea il modello di clustering K-Means
kmeans = KMeans().setK(5)  # Specifica il numero di cluster desiderato
model = kmeans.fit(df)

# Esegui il clustering sui dati
clustered_data = model.transform(df)

# Visualizza i risultati
clustered_data.select("features", "prediction").show()


+--------------------+----------+
|            features|prediction|
+--------------------+----------+
|[4.90412906754169...|         3|
|[7.812,16.0370909...|         2|
|[5.61120377930713...|         0|
|[7.51289296856251...|         4|
|[9.21761829236056...|         2|
|[11.3327221768072...|         4|
|[2.68620114394661...|         3|
|[-0.7123476548073...|         0|
|[0.87731152204836...|         3|
|[5.80911483253589...|         2|
|[7.56750165672631...|         4|
|[0.00413040886875...|         4|
|[9.14449136518341...|         1|
|[7.03918990005261...|         1|
|[9.52270902220123...|         2|
|[-1.7625000000000...|         0|
|[-1.1868002357100...|         3|
|[11.8315141662796...|         1|
|[10.4695447214985...|         4|
|[0.72825412582206...|         2|
+--------------------+----------+
only showing top 20 rows



In [24]:
clustered_data.show()

+----+--------------------+------------------+------------------+------------------+------------------+------------------+------------------+--------------------+----------+
|wban|     DewPointCelsius|    DryBulbCelsius|  RelativeHumidity|        Visibility|    WetBulbCelsius|     WindDirection|         WindSpeed|            features|prediction|
+----+--------------------+------------------+------------------+------------------+------------------+------------------+------------------+--------------------+----------+
|3749|  4.9041290675416995|12.499083948591743| 63.47320207820618|13.748165162701724| 8.967692097347555|233.38255400601585|13.367701668033908|[4.90412906754169...|         3|
|3918|               7.812| 16.03709090909091| 60.38909090909091|14.945768181818265| 11.56297272727273|113.29818181818182| 6.010627272727268|[7.812,16.0370909...|         2|
|3997|  5.6112037793071305|12.046798087017384| 67.83296395660795| 14.15039251137298| 8.764749795870754|171.22535868424123| 13.2234

In [32]:
clustered_data_pandas = clustered_data.toPandas()
clustered_data_pandas.rename(columns={'wban':'WBAN'}, inplace=True)
clustered_data_pandas

Unnamed: 0,WBAN,DewPointCelsius,DryBulbCelsius,RelativeHumidity,Visibility,WetBulbCelsius,WindDirection,WindSpeed,features,prediction
0,3749,4.904129,12.499084,63.473202,13.748165,8.967692,233.382554,13.367702,"[4.9041290675416995, 12.499083948591743, 63.47...",3
1,3918,7.812000,16.037091,60.389091,14.945768,11.562973,113.298182,6.010627,"[7.812, 16.03709090909091, 60.38909090909091, ...",2
2,3997,5.611204,12.046798,67.832964,14.150393,8.764750,171.225359,13.223421,"[5.6112037793071305, 12.046798087017384, 67.83...",0
3,3179,7.512893,17.294462,59.931425,13.019207,12.122744,144.650228,8.149342,"[7.512892968562516, 17.294462443004555, 59.931...",4
4,3704,9.217618,13.991739,76.441383,14.673789,11.582851,139.625958,7.014215,"[9.21761829236056, 13.99173936029606, 76.44138...",2
...,...,...,...,...,...,...,...,...,...,...
1753,24284,13.978229,15.568215,90.608128,14.573222,14.574165,195.551524,12.378933,"[13.978229317851959, 15.56821480406386, 90.608...",3
1754,24213,13.516667,15.866667,86.416667,11.650833,14.491667,185.000000,13.812500,"[13.516666666666666, 15.866666666666667, 86.41...",0
1755,53884,6.103238,10.969933,75.321050,14.889606,8.707677,144.905081,8.498802,"[6.103238414293686, 10.969932998324966, 75.321...",4
1756,53932,5.000000,19.000000,40.000000,16.090000,11.700000,200.000000,17.700000,"[5.0, 19.0, 40.0, 16.09, 11.7, 200.0, 17.7]",3


In [33]:
import pandas as pd
df = pd.read_csv('./assets/data/2013stationM.csv')
df

Unnamed: 0,WBAN,Name,State,Location,Latitude,Longitude,StationHeight,Barometer,TimeZone
0,3011.0,TELLURIDE,CO,TELLURIDE REGIONAL AIRPORT,37.95000,-107.90000,2766.97,,-7
1,3012.0,TAOS,NM,TAOS REGIONAL AIRPORT,36.45000,-105.66667,2161.34,,-7
2,3013.0,LAMAR,CO,LAMAR MUNICIPAL AIRPORT,38.07000,-102.68806,1128.67,124449.83,-7
3,3014.0,TORREON,NM,TORREON,35.79910,-107.18130,2105.86,0.00,-7
4,3016.0,RIFLE,CO,GARFIELD CO REGIONAL ARPT,39.52778,-107.71972,1689.81,186454.63,-7
...,...,...,...,...,...,...,...,...,...
2151,93764.0,,MD,GAITHERSBURG MONTGOMERY COUNTY AIR PARK,39.16667,-77.16667,,,-5
2152,93804.0,,SC,SPARTANBURG DOWNTOWN MEMORIAL AIRPORT,34.91667,-81.95000,,,-5
2153,94035.0,,CO,FORT COLLINS LOVELAND AP,40.45000,-105.01667,,,-7
2154,94852.0,,IN,MARION MUNICIPAL AP,40.48333,-85.68333,,,-5


In [37]:
joined_df = pd.merge(df, clustered_data_pandas[['WBAN', 'prediction']], on='WBAN', how='inner')
joined_df

Unnamed: 0,WBAN,Name,State,Location,Latitude,Longitude,StationHeight,Barometer,TimeZone,prediction
0,3011.0,TELLURIDE,CO,TELLURIDE REGIONAL AIRPORT,37.95000,-107.90000,2766.97,,-7,2
1,3012.0,TAOS,NM,TAOS REGIONAL AIRPORT,36.45000,-105.66667,2161.34,,-7,4
2,3013.0,LAMAR,CO,LAMAR MUNICIPAL AIRPORT,38.07000,-102.68806,1128.67,124449.83,-7,4
3,3016.0,RIFLE,CO,GARFIELD CO REGIONAL ARPT,39.52778,-107.71972,1689.81,186454.63,-7,2
4,3017.0,DENVER,CO,DENVER INTERNATIONAL AIRPORT,39.83280,-104.65750,1655.37,182255.51,-7,0
...,...,...,...,...,...,...,...,...,...,...
1753,54939.0,CENTERVILLE,IA,CENTERVILLE MUNICIPAL AIRPORT,40.68400,-92.90100,,,-6,2
1754,54940.0,FOREST CITY,IA,FOREST CITY MUNICIPAL AIRPORT,43.23475,-93.62410,,,-6,0
1755,54941.0,IOWA FALLS,IA,IOWA FALLS MUNICIPAL AIRPORT,42.47138,-93.20707,,,-6,0
1756,54942.0,OSCEOLA,IA,OSCEOLA MUNICIPAL AIRPORT,41.05222,-93.68913,,,-6,3
