In [1]:
#Lazio

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import pyproj

# Create a SparkSession
spark = SparkSession.builder.appName("timeseries").config('spark.sql.shuffle.partitions',500).config('spark.driver.maxResultSize', '10G') .config("spark.driver.memory", "32g").config("spark.executor.memory", "16g").config("spark.task.maxFailures", "10").config("spark.executor.instances", "16").config("spark.local.dir", "/afs/enea.it/por/user/nafis/PFS/tmp").getOrCreate()


schema = StructType(
   [StructField('x', DoubleType(), True),
    StructField('y', DoubleType(), True),
    StructField('z', DoubleType(), True),
    StructField('time', TimestampType(), True),
    StructField('c_PM25', DoubleType(), True),
    StructField('c_PM10', DoubleType(), True),
    StructField('c_O3', DoubleType(), True),
    StructField('c_NO2', DoubleType(), True),
    StructField('geometry', StringType(), True),
    StructField('index_right', IntegerType(), True),
    StructField('OBJECTID', IntegerType(), True),
    StructField('COD_REG', IntegerType(), True),
    StructField('COD_PRO', IntegerType(), True),
    StructField('NOME', StringType(), True),
    StructField('SIGLA', StringType(), True),
    StructField('Shape_Leng', DoubleType(), True),
    StructField('Shape_Area', DoubleType(), True),
   ]
  )


df = spark.read.format("csv").load("/afs/enea.it/por/user/nafis/PFS/tmp/nafi2/nafi/data_chunk/lazio_prov/", header=True, schema=schema)

# Select the columns in the desired order
df = df.select( 'x', 'y', 'z', 'time', 'c_PM25', 'c_PM10', 'c_O3', 'c_NO2', 'geometry', 'NOME')


df = df.withColumn("x", df["x"].cast(DoubleType())) \
    .withColumn("y", df["y"].cast(DoubleType())) \
    .withColumn("z", df["z"].cast(DoubleType())) \
    .withColumn("time", df["time"].cast(TimestampType())) \
    .withColumn("c_PM25", df["c_PM25"].cast(DoubleType())) \
    .withColumn("c_PM10", df["c_PM10"].cast(DoubleType())) \
    .withColumn("c_O3", df["c_O3"].cast(DoubleType())) \
    .withColumn("c_NO2", df["c_NO2"].cast(DoubleType())) \
    .withColumn("geometry", df["geometry"].cast(StringType()))\
    .withColumn("NOME", df["NOME"].cast(StringType()))

df = df.withColumnRenamed("c_PM25", "c_PM2_5")

df.show()

df.printSchema()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/06/26 22:32:38 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/06/26 22:32:38 WARN SparkConf: Note that spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone/kubernetes and LOCAL_DIRS in YARN).


[Stage 0:>                                                          (0 + 1) / 1]                                                                                

+--------+---------+----+-------------------+------------------+------------------+------------------+------------------+--------------------+-------+
|       x|        y|   z|               time|           c_PM2_5|            c_PM10|              c_O3|             c_NO2|            geometry|   NOME|
+--------+---------+----+-------------------+------------------+------------------+------------------+------------------+--------------------+-------+
|702000.0|4696000.0|20.0|2021-12-11 00:00:00|         1.2706414|1.5203451000000001|60.837047999999996|         2.0367115|POINT (702000 469...|Viterbo|
|702000.0|4696000.0|20.0|2021-12-11 01:00:00|          1.230251|          1.375211|         57.657654|         2.0870926|POINT (702000 469...|Viterbo|
|702000.0|4696000.0|20.0|2021-12-11 02:00:00|1.9736630000000002|          2.107126|          54.05386|          2.587756|POINT (702000 469...|Viterbo|
|702000.0|4696000.0|20.0|2021-12-11 03:00:00|1.7445994999999999|1.8961723000000001|          5

In [2]:
from pyspark.sql import functions as F
import pandas as pd

data = df.withColumnRenamed("time", "date_time")


# Convert the "date_time" column to a datetime object
data = data.withColumn("date_time", F.to_timestamp("date_time"))

# Extract the date and time values and store them in separate columns
data = data.withColumn("date", F.date_trunc("day", "date_time"))
data = data.withColumn("time", F.date_trunc("hour", "date_time"))

data = data.withColumnRenamed("date_time", "original_date_time")

# data.show()

In [3]:
from pyspark.sql.functions import col, when

# Create the season column based on the month of the date_column
data = data.withColumn(
    "season",
    when(col("original_date_time").between("2019-01-01 00:00:00", "2019-03-19 23:00:00") | col("original_date_time").between("2019-12-21 00:00:00", "2020-03-19 23:00:00") | col("original_date_time").between("2020-12-21 00:00:00", "2021-03-19 23:00:00") | col("original_date_time").between("2021-12-21 00:00:00", "2022-03-19 23:00:00") | col("original_date_time").between("2022-12-21 00:00:00", "2023-03-19 23:00:00"), "Winter")
    .when(col("original_date_time").between("2019-03-20 00:00:00", "2019-06-20 23:00:00") | col("original_date_time").between("2020-03-20 00:00:00", "2020-06-20 23:00:00") | col("original_date_time").between("2021-03-20 00:00:00", "2021-06-20 23:00:00") | col("original_date_time").between("2022-03-20 00:00:00", "2022-06-20 23:00:00") | col("original_date_time").between("2023-03-20 00:00:00", "2023-06-20 23:00:00"), "Spring")
    .when(col("original_date_time").between("2019-06-21 00:00:00", "2019-09-21 23:00:00") | col("original_date_time").between("2020-06-21 00:00:00", "2020-09-21 23:00:00") | col("original_date_time").between("2021-06-21 00:00:00", "2021-09-21 23:00:00") | col("original_date_time").between("2022-06-21 00:00:00", "2022-09-21 23:00:00") | col("original_date_time").between("2023-06-21 00:00:00", "2023-09-21 23:00:00"), "Summer")
    .when(col("original_date_time").between("2019-09-22 00:00:00", "2019-12-20 23:00:00") | col("original_date_time").between("2020-09-22 00:00:00", "2020-12-20 23:00:00") | col("original_date_time").between("2021-09-22 00:00:00", "2021-12-20 23:00:00") | col("original_date_time").between("2022-09-22 00:00:00", "2022-12-20 23:00:00") | col("original_date_time").between("2023-09-22 00:00:00", "2023-12-20 23:00:00"), "Autumn")
)


# data.show()

In [4]:
dataset = data.select(col("c_PM2_5"), col("c_PM10"), col("c_O3"), col("c_NO2"))
dataset_stat = dataset.describe()
dataset_stat.show()



+-------+------------------+--------------------+--------------------+--------------------+
|summary|           c_PM2_5|              c_PM10|                c_O3|               c_NO2|
+-------+------------------+--------------------+--------------------+--------------------+
|  count|          33650200|            33650200|            33650200|            33650200|
|   mean| 7.518901573640087|   9.895731412864988|    70.6286214955896|  7.2519637674854085|
| stddev| 7.556449815429103|   8.763785837638139|   22.40800767042539|  12.206253204229768|
|    min|       0.003801894|0.004714028400000...|9.932134999999999...|0.022215692000000002|
|    max|473.63754000000006|  499.67792000000003|           283.12817|            278.2284|
+-------+------------------+--------------------+--------------------+--------------------+



                                                                                

In [5]:
data_PM25 = data.select(col("original_date_time"),col("c_PM2_5"), col("x"), col("y"), col("NOME"))
data_PM25.show()

+-------------------+------------------+--------+---------+-------+
| original_date_time|           c_PM2_5|       x|        y|   NOME|
+-------------------+------------------+--------+---------+-------+
|2021-12-11 00:00:00|         1.2706414|702000.0|4696000.0|Viterbo|
|2021-12-11 01:00:00|          1.230251|702000.0|4696000.0|Viterbo|
|2021-12-11 02:00:00|1.9736630000000002|702000.0|4696000.0|Viterbo|
|2021-12-11 03:00:00|1.7445994999999999|702000.0|4696000.0|Viterbo|
|2021-12-11 04:00:00|         1.2854077|702000.0|4696000.0|Viterbo|
|2021-12-11 05:00:00|0.8523575999999999|702000.0|4696000.0|Viterbo|
|2021-12-11 06:00:00|        0.78199506|702000.0|4696000.0|Viterbo|
|2021-12-11 07:00:00|         1.2314979|702000.0|4696000.0|Viterbo|
|2021-12-11 08:00:00|         2.3940308|702000.0|4696000.0|Viterbo|
|2021-12-11 09:00:00|         3.0954514|702000.0|4696000.0|Viterbo|
|2021-12-11 10:00:00|         2.8416717|702000.0|4696000.0|Viterbo|
|2021-12-11 11:00:00|3.1728162999999996|702000.0

In [6]:
data_PM10 = data.select(col("original_date_time"),col("c_PM10"), col("x"), col("y"), col("NOME"))
data_PM10.show()

+-------------------+------------------+--------+---------+-------+
| original_date_time|            c_PM10|       x|        y|   NOME|
+-------------------+------------------+--------+---------+-------+
|2021-12-11 00:00:00|1.5203451000000001|702000.0|4696000.0|Viterbo|
|2021-12-11 01:00:00|          1.375211|702000.0|4696000.0|Viterbo|
|2021-12-11 02:00:00|          2.107126|702000.0|4696000.0|Viterbo|
|2021-12-11 03:00:00|1.8961723000000001|702000.0|4696000.0|Viterbo|
|2021-12-11 04:00:00|1.5713663999999998|702000.0|4696000.0|Viterbo|
|2021-12-11 05:00:00|         1.1182661|702000.0|4696000.0|Viterbo|
|2021-12-11 06:00:00|        0.94059986|702000.0|4696000.0|Viterbo|
|2021-12-11 07:00:00|         1.4721483|702000.0|4696000.0|Viterbo|
|2021-12-11 08:00:00|         3.0101209|702000.0|4696000.0|Viterbo|
|2021-12-11 09:00:00|3.8334242999999995|702000.0|4696000.0|Viterbo|
|2021-12-11 10:00:00|3.5963568999999995|702000.0|4696000.0|Viterbo|
|2021-12-11 11:00:00|          3.890354|702000.0

In [7]:
data_NO2 = data.select(col("original_date_time"),col("c_NO2"), col("x"), col("y"), col("NOME"))
data_NO2.show()

+-------------------+------------------+--------+---------+-------+
| original_date_time|             c_NO2|       x|        y|   NOME|
+-------------------+------------------+--------+---------+-------+
|2021-12-11 00:00:00|         2.0367115|702000.0|4696000.0|Viterbo|
|2021-12-11 01:00:00|         2.0870926|702000.0|4696000.0|Viterbo|
|2021-12-11 02:00:00|          2.587756|702000.0|4696000.0|Viterbo|
|2021-12-11 03:00:00|         2.1309106|702000.0|4696000.0|Viterbo|
|2021-12-11 04:00:00|1.6039573999999999|702000.0|4696000.0|Viterbo|
|2021-12-11 05:00:00|         1.8012519|702000.0|4696000.0|Viterbo|
|2021-12-11 06:00:00|          2.225234|702000.0|4696000.0|Viterbo|
|2021-12-11 07:00:00|         2.2615266|702000.0|4696000.0|Viterbo|
|2021-12-11 08:00:00|2.1766273999999997|702000.0|4696000.0|Viterbo|
|2021-12-11 09:00:00|2.1541189999999997|702000.0|4696000.0|Viterbo|
|2021-12-11 10:00:00|1.7994093999999998|702000.0|4696000.0|Viterbo|
|2021-12-11 11:00:00|1.9321240000000002|702000.0

In [8]:
data_O3 = data.select(col("original_date_time"),col("c_O3"), col("x"), col("y"), col("NOME"))
data_O3.show()

+-------------------+------------------+--------+---------+-------+
| original_date_time|              c_O3|       x|        y|   NOME|
+-------------------+------------------+--------+---------+-------+
|2021-12-11 00:00:00|60.837047999999996|702000.0|4696000.0|Viterbo|
|2021-12-11 01:00:00|         57.657654|702000.0|4696000.0|Viterbo|
|2021-12-11 02:00:00|          54.05386|702000.0|4696000.0|Viterbo|
|2021-12-11 03:00:00|          54.00688|702000.0|4696000.0|Viterbo|
|2021-12-11 04:00:00|54.418915000000005|702000.0|4696000.0|Viterbo|
|2021-12-11 05:00:00| 53.96324499999999|702000.0|4696000.0|Viterbo|
|2021-12-11 06:00:00|         48.937576|702000.0|4696000.0|Viterbo|
|2021-12-11 07:00:00|46.245734999999996|702000.0|4696000.0|Viterbo|
|2021-12-11 08:00:00|          51.23545|702000.0|4696000.0|Viterbo|
|2021-12-11 09:00:00|         51.540134|702000.0|4696000.0|Viterbo|
|2021-12-11 10:00:00|           54.0782|702000.0|4696000.0|Viterbo|
|2021-12-11 11:00:00|          54.80785|702000.0

# Prophet Model

In [9]:
import io, os, sys, setuptools, tokenize
from pyspark.sql.types import StructType,StructField,StringType,TimestampType,ArrayType,DoubleType
from pyspark.sql.functions import current_date
from pyspark.sql.functions import pandas_udf, PandasUDFType
import matplotlib.pyplot as plt
from prophet import Prophet
from datetime import datetime
import pandas as pd
from time import time

In [10]:
# select the "first_name" and "last_name" columns and get distinct values
unique_coordinates = data.select(col("x"), col("y"), col("NOME")).distinct()

# show the unique values
unique_coordinates.show(60)



+--------+---------+-------+
|       x|        y|   NOME|
+--------+---------+-------+
|806000.0|4632000.0|   Roma|
|822000.0|4672000.0|  Rieti|
|778000.0|4660000.0|   Roma|
|822000.0|4716000.0|  Rieti|
|814000.0|4600000.0| Latina|
|822000.0|4636000.0|   Roma|
|814000.0|4640000.0|   Roma|
|754000.0|4708000.0|Viterbo|
|742000.0|4684000.0|Viterbo|
|786000.0|4664000.0|   Roma|
|746000.0|4708000.0|Viterbo|
|818000.0|4612000.0| Latina|
|814000.0|4708000.0|  Rieti|
|774000.0|4696000.0|Viterbo|
|750000.0|4680000.0|Viterbo|
|798000.0|4680000.0|   Roma|
|766000.0|4680000.0|Viterbo|
|782000.0|4660000.0|   Roma|
|742000.0|4736000.0|Viterbo|
|766000.0|4676000.0|Viterbo|
|778000.0|4696000.0|Viterbo|
|738000.0|4676000.0|   Roma|
|738000.0|4740000.0|Viterbo|
|782000.0|4696000.0|Viterbo|
|790000.0|4672000.0|   Roma|
|818000.0|4656000.0|   Roma|
|706000.0|4696000.0|Viterbo|
|730000.0|4724000.0|Viterbo|
|714000.0|4688000.0|Viterbo|
|742000.0|4724000.0|Viterbo|
|758000.0|4720000.0|Viterbo|
|802000.0|4648

                                                                                

PM2.5

In [19]:
from pyspark.sql.functions import *
from time import time
import matplotlib.pyplot as plt
import os

data_PM25_prophet = data_PM25.groupBy("NOME", "original_date_time").agg(avg("c_PM2_5").alias("avg_c_PM25"))
data_PM25_prophet.show()

data_PM25_prophet_u = data_PM25_prophet.withColumnRenamed("avg_c_PM25", "y").withColumnRenamed("original_date_time", "ds")

data_PM25_prophet = data_PM25_prophet_u.withColumn("ds", to_timestamp("ds", "yyyy-MM-dd HH:mm:ss")) \
                                       .withColumn("ds", from_utc_timestamp("ds", "Europe/Rome"))

data_PM25_prophet.printSchema()

def train_and_forecast(group):
    
    print(f"Processing group: {group['NOME'].iloc[0]} with {len(group)} data points")
    
    # Initiate the model
    
    # Convert "ds" column to pandas datetime format and set timezone to Europe/Rome
    group["ds"] = pd.to_datetime(group["ds"])
    
    m = Prophet(seasonality_mode="additive",
                    yearly_seasonality=True,
                    weekly_seasonality=True,
                    daily_seasonality=True,
                    interval_width=0.95)
    
    # Fit the model
    m.fit(group[group['ds'] <= cutoff_date])
    
    # Make predictions
    forecast = m.predict(group)

    # Create the directory if it does not exist
    path = '/afs/enea.it/por/user/nafis/PFS/por/Thesis/codes/prophet_lazio/PM25/'

    if not os.path.exists(path):
        os.makedirs(path)
        
    try:
        fig1 = m.plot(forecast)
        fig1.suptitle(f"PM2.5 Concentration Forecast for {group['NOME'].iloc[0]}")
        fig1.legend(['Actual', 'Forecasted'], loc='upper right')
        fig1.subplots_adjust(top=0.9)
      
        fig2 = m.plot_components(forecast)
        fig2.suptitle(f"PM2.5 Concentration Component Forecast for {group['NOME'].iloc[0]}")
        fig2.subplots_adjust(top=0.9)

        #Save the plots as images using Matplotlib's savefig method
        fig1.savefig(f"/afs/enea.it/por/user/nafis/PFS/por/Thesis/codes/prophet_lazio/PM25/{group['NOME'].iloc[0]}_prophet_forecast.png")
        fig2.savefig(f"/afs/enea.it/por/user/nafis/PFS/por/Thesis/codes/prophet_lazio/PM25/{group['NOME'].iloc[0]}_prophet_components.png")
    
    except Exception as e:
        print(f"Error generating plots for group {group['NOME'].iloc[0]}: {str(e)}")
    
    f_pd = forecast[['ds', 'yhat', 'yhat_lower', 'yhat_upper','trend', 'yearly', 'weekly', 'daily']].set_index("ds")
 
    group_pd = group[['ds', 'NOME', 'y']].set_index("ds")
    
    result_pd = f_pd.join( group_pd, how = "left")
    result_pd.reset_index(level=0, inplace=True)
    
    result_pd['NOME'] = group['NOME'].iloc[0]
    
    #Return the forecasted results
    return result_pd[['ds', 'NOME', 'y', 'yhat', 'yhat_upper', 'yhat_lower','trend', 'yearly', 'weekly', 'daily']] 

# Define the result schema
result_schema =StructType([
    StructField('ds',TimestampType()),
    StructField('NOME', StringType()),
    StructField('y', DoubleType()),
    StructField('yhat',FloatType()),
    StructField('yhat_upper',FloatType()),
    StructField('yhat_lower',FloatType()),
    StructField('trend',FloatType()),
    StructField('yearly',FloatType()),
    StructField('weekly',FloatType()),
    StructField('daily',FloatType())
  ])

# create train and test data based on cutoff date
cutoff_date = "2021-06-30 23:00:00"

data_PM25_prophet_train = data_PM25_prophet.filter(col('ds') <= cutoff_date)
data_PM25_prophet_test = data_PM25_prophet.filter((col('ds') > cutoff_date))

max_date = data_PM25_prophet_test.agg(max('ds')).collect()[0][0]

period = data_PM25_prophet_test.count()

# Start time
start_time = time()
# Train and forecast by ticker 
spark_forecast_PM25 = data_PM25_prophet.groupBy("NOME").applyInPandas(train_and_forecast, schema=result_schema)
# Processing time
print('The time used for the Spark forecast is ', time()-start_time)

spark_forecast_PM25.show()


                                                                                

+-------+-------------------+------------------+
|   NOME| original_date_time|        avg_c_PM25|
+-------+-------------------+------------------+
|Viterbo|2022-02-02 01:00:00|1.8850021646017705|
|  Rieti|2021-11-01 05:00:00|2.8075066410404617|
|   Roma|2021-12-12 02:00:00|2.2928461067359045|
|  Rieti|2021-12-19 15:00:00| 6.343829013872831|
|  Rieti|2021-05-20 22:00:00| 1.867738284393064|
|  Rieti|2021-10-08 15:00:00|1.5961037958381503|
|   Roma|2021-05-22 11:00:00|3.5715732362017802|
| Latina|2021-06-10 21:00:00|4.2314189214285705|
| Latina|2021-12-13 00:00:00| 7.768597213214285|
|   Roma|2021-10-11 14:00:00|  2.76334225222552|
|Viterbo|2021-06-27 07:00:00|3.2851764265486727|
|   Roma|2021-10-11 19:00:00|3.1654847890207707|
|Viterbo|2021-10-08 00:00:00| 1.489313195353982|
|  Rieti|2021-10-08 07:00:00| 1.947233283526011|
|Viterbo|2021-12-07 13:00:00|2.9378791163716818|
| Latina|2021-05-21 09:00:00| 4.230126760714284|
|Viterbo|2021-11-29 20:00:00|2.7331635827433627|
|  Rieti|2021-03-09 

                                                                                

The time used for the Spark forecast is  0.05734062194824219


Processing group: Rieti with 29862 data points                      (0 + 1) / 1]
14:59:03 - cmdstanpy - INFO - Chain [1] start processing
14:59:18 - cmdstanpy - INFO - Chain [1] done processing


+-------------------+-----+------------------+---------+----------+-----------+----------+---------+-----------+------------+
|                 ds| NOME|                 y|     yhat|yhat_upper| yhat_lower|     trend|   yearly|     weekly|       daily|
+-------------------+-----+------------------+---------+----------+-----------+----------+---------+-----------+------------+
|2019-01-01 01:00:00|Rieti| 7.184400588439304|5.5260086| 11.956094|-0.90522933|0.96959853|2.8750517| 0.05885533|   1.6225029|
|2019-01-01 02:00:00|Rieti|  6.35735458150289|5.2553344|  11.70554| -0.9260314|0.97456086|2.8789773| 0.07901918|    1.322777|
|2019-01-01 03:00:00|Rieti| 5.176235379190753|4.8768744| 11.135719| -1.4033744| 0.9795232|2.8829155| 0.09737682|   0.9170589|
|2019-01-01 04:00:00|Rieti|4.5984373115606925| 4.495203| 10.598423| -1.7664592|0.98448557|2.8868659| 0.11379738|   0.5100541|
|2019-01-01 05:00:00|Rieti| 4.466292086127169|4.2132306| 10.511659|  -2.591771| 0.9894479|2.8908284| 0.12816855|  0.20

Processing group: Viterbo with 29862 data points
                                                                                

In [20]:
from pyspark.sql.functions import col
from pyspark.sql.functions import pow, sqrt

# Calculate MAE
mae_PM25 = spark_forecast_PM25.select(col("NOME"), abs(col("y") - col("yhat")).alias("error")) \
                    .groupBy("NOME") \
                    .agg({"error": "mean"}) \
                    .withColumnRenamed("avg(error)", "MAE")

# Calculate MSE
mse_PM25 = spark_forecast_PM25.select(col("NOME"), pow(col("y") - col("yhat"), 2).alias("error")) \
                    .groupBy("NOME") \
                    .agg({"error": "mean"}) \
                    .withColumnRenamed("avg(error)", "MSE")

# Calculate RMSE
rmse_PM25 = spark_forecast_PM25.select(col("NOME"), pow(col("y") - col("yhat"), 2).alias("error")) \
                     .groupBy("NOME") \
                     .agg({"error": "mean"}) \
                     .withColumnRenamed("avg(error)", "MSE") \
                     .withColumn("RMSE", sqrt(col("MSE")).alias("RMSE"))

# Show the results
mae_PM25.show()
mse_PM25.show()
rmse_PM25.show()

14:59:29 - cmdstanpy - INFO - Chain [1] start processing         (0 + 96) / 119]
14:59:47 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Frosinone with 29862 data points                  (0 + 2) / 2]
Processing group: Rieti with 29862 data points
15:00:14 - cmdstanpy - INFO - Chain [1] start processing
15:00:14 - cmdstanpy - INFO - Chain [1] start processing
15:00:32 - cmdstanpy - INFO - Chain [1] done processing
15:00:34 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Latina with 29862 data points
15:00:42 - cmdstanpy - INFO - Chain [1] start processing
Processing group: Viterbo with 29862 data points
15:00:44 - cmdstanpy - INFO - Chain [1] start processing
15:01:02 - cmdstanpy - INFO - Chain [1] done processing
15:01:02 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Roma with 29862 data points
15:01:11 - cmdstanpy - INFO - Chain [1] start processing            (1 + 1) / 2]
15:01:23 - cmdstanpy - INFO - Chain [1] done processing
   

+---------+------------------+
|     NOME|               MAE|
+---------+------------------+
|  Viterbo| 3.155507109172206|
|    Rieti|2.7115033544297553|
|   Latina| 3.580196274905541|
|Frosinone| 3.862957565730451|
|     Roma| 4.534959517075816|
+---------+------------------+



Processing group: Frosinone with 29862 data points
15:02:14 - cmdstanpy - INFO - Chain [1] start processing            (0 + 2) / 2]
15:02:14 - cmdstanpy - INFO - Chain [1] start processing
15:02:33 - cmdstanpy - INFO - Chain [1] done processing
15:02:33 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Latina with 29862 data points
Processing group: Viterbo with 29862 data points
15:02:42 - cmdstanpy - INFO - Chain [1] start processing
15:02:43 - cmdstanpy - INFO - Chain [1] start processing
15:03:02 - cmdstanpy - INFO - Chain [1] done processing
15:03:02 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Roma with 29862 data points
15:03:12 - cmdstanpy - INFO - Chain [1] start processing            (1 + 1) / 2]
15:03:23 - cmdstanpy - INFO - Chain [1] done processing
                                                                                

+---------+------------------+
|     NOME|               MSE|
+---------+------------------+
|  Viterbo|17.675396610709733|
|    Rieti|13.232938597060423|
|   Latina| 22.86308769936046|
|Frosinone| 28.50381361550869|
|     Roma|34.991957228262685|
+---------+------------------+



Processing group: Frosinone with 29862 data points
15:04:14 - cmdstanpy - INFO - Chain [1] start processing            (0 + 2) / 2]
15:04:14 - cmdstanpy - INFO - Chain [1] start processing
15:04:31 - cmdstanpy - INFO - Chain [1] done processing
15:04:35 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Latina with 29862 data points
15:04:42 - cmdstanpy - INFO - Chain [1] start processing
Processing group: Viterbo with 29862 data points
15:04:45 - cmdstanpy - INFO - Chain [1] start processing
15:05:03 - cmdstanpy - INFO - Chain [1] done processing
15:05:05 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Roma with 29862 data points
15:05:14 - cmdstanpy - INFO - Chain [1] start processing            (0 + 2) / 2]
15:05:25 - cmdstanpy - INFO - Chain [1] done processing             (1 + 1) / 2]


+---------+------------------+------------------+
|     NOME|               MSE|              RMSE|
+---------+------------------+------------------+
|  Viterbo|17.675396610709733| 4.204211770440415|
|    Rieti|13.232938597060423|3.6377106258003016|
|   Latina| 22.86308769936046| 4.781536123398051|
|Frosinone| 28.50381361550869| 5.338896291885495|
|     Roma|34.991957228262685|5.9154000057699125|
+---------+------------------+------------------+



                                                                                

In [11]:
#Long and Short Term Forecast

from pyspark.sql.functions import *
from time import time
import matplotlib.pyplot as plt
import os

data_PM25_prophet = data_PM25.groupBy("NOME", "original_date_time").agg(avg("c_PM2_5").alias("avg_c_PM25"))
data_PM25_prophet.show()

data_PM25_prophet_u = data_PM25_prophet.withColumnRenamed("avg_c_PM25", "y").withColumnRenamed("original_date_time", "ds")

data_PM25_prophet = data_PM25_prophet_u.withColumn("ds", to_timestamp("ds", "yyyy-MM-dd HH:mm:ss")) \
                                       .withColumn("ds", from_utc_timestamp("ds", "Europe/Rome"))

data_PM25_prophet.printSchema()

def train_and_forecast(group):
    
    print(f"Processing group: {group['NOME'].iloc[0]} with {len(group)} data points")
    
    # Initiate the model
    
    # Convert "ds" column to pandas datetime format and set timezone to Europe/Rome
    group["ds"] = pd.to_datetime(group["ds"])
    
    m = Prophet(seasonality_mode="additive",
                    yearly_seasonality=True,
                    weekly_seasonality=True,
                    daily_seasonality=True,
                    interval_width=0.95)
    
    # Fit the model
    m.fit(group[group['ds'] <= cutoff_date])
    
    # Make predictions
    forecast = m.predict(group)

    # Create the directory if it does not exist
    path = '/afs/enea.it/por/user/nafis/PFS/por/Thesis/codes/prophet_lazio/PM25/'

    if not os.path.exists(path):
        os.makedirs(path)
    
   # try:
    #    fig1 = m.plot(forecast)
     #   fig1.suptitle(f"PM2.5 Concentration Forecast for {group['NOME'].iloc[0]}")
     #   fig1.legend(['Actual', 'Forecasted'], loc='upper right')
     #   fig1.subplots_adjust(top=0.9)
        
      #  fig2 = m.plot_components(forecast)
       # fig2.suptitle(f"PM2.5 Concentration Component Forecast for {group['NOME'].iloc[0]}")
       # fig2.subplots_adjust(top=0.9)

        # Save the plots as images using Matplotlib's savefig method
        #fig1.savefig(f"/afs/enea.it/por/user/nafis/PFS/por/Thesis/codes/prophet/PM25/{group['NOME'].iloc[0]}_prophet_forecast.png")
        #fig2.savefig(f"/afs/enea.it/por/user/nafis/PFS/por/Thesis/codes/prophet/PM25/{group['NOME'].iloc[0]}_prophet_components.png")
    
    #except Exception as e:
     #   print(f"Error generating plots for group {group['NOME'].iloc[0]}: {str(e)}")
    
    f_pd = forecast[['ds', 'yhat', 'yhat_lower', 'yhat_upper','trend', 'yearly', 'weekly', 'daily']].set_index("ds")
 
    group_pd = group[['ds', 'NOME', 'y']].set_index("ds")
    
    result_pd = f_pd.join( group_pd, how = "left")
    result_pd.reset_index(level=0, inplace=True)
    
    result_pd['NOME'] = group['NOME'].iloc[0]
    
    #Return the forecasted results
    return result_pd[['ds', 'NOME', 'y', 'yhat', 'yhat_upper', 'yhat_lower','trend', 'yearly', 'weekly', 'daily']] 

# Define the result schema
result_schema =StructType([
    StructField('ds',TimestampType()),
    StructField('NOME', StringType()),
    StructField('y', DoubleType()),
    StructField('yhat',FloatType()),
    StructField('yhat_upper',FloatType()),
    StructField('yhat_lower',FloatType()),
    StructField('trend',FloatType()),
    StructField('yearly',FloatType()),
    StructField('weekly',FloatType()),
    StructField('daily',FloatType())
  ])

# create train and test data based on cutoff date
cutoff_date = "2021-06-30 23:00:00"

data_PM25_prophet_train = data_PM25_prophet.filter(col('ds') <= cutoff_date)
data_PM25_prophet_test = data_PM25_prophet.filter((col('ds') > cutoff_date))

max_date = data_PM25_prophet_test.agg(max('ds')).collect()[0][0]

period = data_PM25_prophet_test.count()

# Start time
start_time = time()
# Train and forecast by ticker 
spark_forecast_PM25 = data_PM25_prophet.groupBy("NOME").applyInPandas(train_and_forecast, schema=result_schema)
# Processing time
print('The time used for the Spark forecast is ', time()-start_time)

#spark_forecast_PM25.show()


                                                                                

+-------+-------------------+------------------+
|   NOME| original_date_time|        avg_c_PM25|
+-------+-------------------+------------------+
|Viterbo|2022-02-02 01:00:00|1.8850021646017705|
|  Rieti|2021-11-01 05:00:00|2.8075066410404617|
|   Roma|2021-12-12 02:00:00|2.2928461067359045|
|  Rieti|2021-12-19 15:00:00| 6.343829013872831|
|  Rieti|2021-05-20 22:00:00| 1.867738284393064|
|  Rieti|2021-10-08 15:00:00|1.5961037958381503|
|   Roma|2021-05-22 11:00:00|3.5715732362017802|
| Latina|2021-06-10 21:00:00|4.2314189214285705|
| Latina|2021-12-13 00:00:00| 7.768597213214285|
|   Roma|2021-10-11 14:00:00|  2.76334225222552|
|Viterbo|2021-06-27 07:00:00|3.2851764265486727|
|   Roma|2021-10-11 19:00:00|3.1654847890207707|
|Viterbo|2021-10-08 00:00:00| 1.489313195353982|
|  Rieti|2021-10-08 07:00:00| 1.947233283526011|
|Viterbo|2021-12-07 13:00:00|2.9378791163716818|
| Latina|2021-05-21 09:00:00| 4.230126760714284|
|Viterbo|2021-11-29 20:00:00|2.7331635827433627|
|  Rieti|2021-03-09 

                                                                                

The time used for the Spark forecast is  0.5329997539520264


In [12]:
from pyspark.sql.functions import col, pow, sqrt

# Define cutoff dates
cutoff_dates = {
    "5 Days": "2021-07-05 23:00:00",
    "30 Days": "2021-07-30 23:00:00",
    "90 Days": "2021-09-28 23:00:00",
    "180 Days": "2021-12-27 23:00:00",
    "365 Days": "2022-06-29 23:00:00"
}

# Define a list to store the evaluation results
results = []

# Compute evaluation metrics for each time period and NOME
for period, cutoff in cutoff_dates.items():
    for nome in spark_forecast_PM25.select("NOME").distinct().rdd.flatMap(lambda x: x).collect():
        # Filter the forecast data based on the cutoff date and NOME
        forecast_data_PM25 = spark_forecast_PM25.filter((col("ds") <= cutoff) & (col("NOME") == nome))

        # Calculate MAE
        mae = forecast_data_PM25.select(abs(col("y") - col("yhat")).alias("error")) \
                             .agg({"error": "mean"}) \
                             .withColumnRenamed("avg(error)", "MAE") \
                             .collect()[0]["MAE"]

        # Calculate MSE
        mse = forecast_data_PM25.select(pow(col("y") - col("yhat"), 2).alias("error")) \
                             .agg({"error": "mean"}) \
                             .withColumnRenamed("avg(error)", "MSE") \
                             .collect()[0]["MSE"]

        # Calculate RMSE
        rmse = forecast_data_PM25.select(pow(col("y") - col("yhat"), 2).alias("error")) \
                              .agg({"error": "mean"}) \
                              .withColumnRenamed("avg(error)", "MSE") \
                              .withColumn("RMSE", sqrt(col("MSE")).alias("RMSE")) \
                              .collect()[0]["RMSE"]

        # Calculate coverage
        coverage = forecast_data_PM25.filter((col("y") >= col("yhat_lower")) & (col("y") <= col("yhat_upper"))) \
                                  .count() / forecast_data_PM25.count() * 100

        # Add the results to the list
        results.append((period, nome, mse, rmse, mae, coverage))

# Create a Spark DataFrame from the results list
result_df_PM25 = spark.createDataFrame(results, ["Time", "NOME", "MSE", "RMSE", "MAE", "Coverage"])

# Show the results
result_df_PM25.show(100)

Processing group: Frosinone with 29862 data points                  (0 + 2) / 2]
Processing group: Rieti with 29862 data points
22:36:30 - cmdstanpy - INFO - Chain [1] start processing
22:36:30 - cmdstanpy - INFO - Chain [1] start processing
22:36:48 - cmdstanpy - INFO - Chain [1] done processing
22:36:50 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Latina with 29862 data points
22:36:56 - cmdstanpy - INFO - Chain [1] start processing
Processing group: Viterbo with 29862 data points
22:36:58 - cmdstanpy - INFO - Chain [1] start processing
22:37:11 - cmdstanpy - INFO - Chain [1] done processing             (0 + 2) / 2]
22:37:12 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Roma with 29862 data points
22:37:18 - cmdstanpy - INFO - Chain [1] start processing
22:37:30 - cmdstanpy - INFO - Chain [1] done processing             (1 + 1) / 2]
Processing group: Frosinone with 29862 data points
22:38:10 - cmdstanpy - INFO - Chain [1] start processing       

23:01:16 - cmdstanpy - INFO - Chain [1] done processing
23:01:17 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Latina with 29862 data points
23:01:24 - cmdstanpy - INFO - Chain [1] start processing
Processing group: Viterbo with 29862 data points
23:01:26 - cmdstanpy - INFO - Chain [1] start processing
23:01:41 - cmdstanpy - INFO - Chain [1] done processing             (0 + 2) / 2]
23:01:41 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Roma with 29862 data points
23:01:49 - cmdstanpy - INFO - Chain [1] start processing            (1 + 1) / 2]
23:02:00 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Rieti with 29862 data points                      (0 + 2) / 2]
Processing group: Frosinone with 29862 data points
23:04:05 - cmdstanpy - INFO - Chain [1] start processing
23:04:05 - cmdstanpy - INFO - Chain [1] start processing
23:04:19 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Latina with 29862 data points
23:04:2

Processing group: Latina with 29862 data points
23:24:58 - cmdstanpy - INFO - Chain [1] start processing
Processing group: Viterbo with 29862 data points
23:25:00 - cmdstanpy - INFO - Chain [1] start processing
23:25:17 - cmdstanpy - INFO - Chain [1] done processing
23:25:18 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Roma with 29862 data points                       (1 + 1) / 2]
23:25:26 - cmdstanpy - INFO - Chain [1] start processing
23:25:37 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Frosinone with 29862 data points                  (0 + 2) / 2]
Processing group: Rieti with 29862 data points
23:28:25 - cmdstanpy - INFO - Chain [1] start processing
23:28:25 - cmdstanpy - INFO - Chain [1] start processing
23:28:44 - cmdstanpy - INFO - Chain [1] done processing
23:28:46 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Latina with 29862 data points
23:28:52 - cmdstanpy - INFO - Chain [1] start processing
Processing group: Viterb

23:50:30 - cmdstanpy - INFO - Chain [1] start processing
23:50:43 - cmdstanpy - INFO - Chain [1] done processing
23:50:46 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Roma with 29862 data points                       (0 + 2) / 2]
23:50:51 - cmdstanpy - INFO - Chain [1] start processing
23:51:04 - cmdstanpy - INFO - Chain [1] done processing             (1 + 1) / 2]
Processing group: Frosinone with 29862 data points                  (0 + 2) / 2]
Processing group: Rieti with 29862 data points
23:54:06 - cmdstanpy - INFO - Chain [1] start processing
23:54:06 - cmdstanpy - INFO - Chain [1] start processing
23:54:22 - cmdstanpy - INFO - Chain [1] done processing
23:54:25 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Latina with 29862 data points
23:54:30 - cmdstanpy - INFO - Chain [1] start processing
Processing group: Viterbo with 29862 data points
23:54:33 - cmdstanpy - INFO - Chain [1] start processing
23:54:48 - cmdstanpy - INFO - Chain [1] done pr

Processing group: Roma with 29862 data points                       (1 + 1) / 2]
00:15:32 - cmdstanpy - INFO - Chain [1] start processing
00:15:46 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Frosinone with 29862 data points                  (0 + 2) / 2]
Processing group: Rieti with 29862 data points
00:18:40 - cmdstanpy - INFO - Chain [1] start processing
00:18:40 - cmdstanpy - INFO - Chain [1] start processing
00:18:59 - cmdstanpy - INFO - Chain [1] done processing
00:18:59 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Latina with 29862 data points
Processing group: Viterbo with 29862 data points
00:19:07 - cmdstanpy - INFO - Chain [1] start processing
00:19:07 - cmdstanpy - INFO - Chain [1] start processing
00:19:27 - cmdstanpy - INFO - Chain [1] done processing             (0 + 2) / 2]
00:19:28 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Roma with 29862 data points                       (1 + 1) / 2]
00:19:36 - cmdstanpy - 

Processing group: Rieti with 29862 data points                      (0 + 2) / 2]
Processing group: Frosinone with 29862 data points
00:40:43 - cmdstanpy - INFO - Chain [1] start processing
00:40:43 - cmdstanpy - INFO - Chain [1] start processing
00:41:02 - cmdstanpy - INFO - Chain [1] done processing
00:41:04 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Latina with 29862 data points
Processing group: Viterbo with 29862 data points
00:41:11 - cmdstanpy - INFO - Chain [1] start processing
00:41:12 - cmdstanpy - INFO - Chain [1] start processing
00:41:30 - cmdstanpy - INFO - Chain [1] done processing             (0 + 2) / 2]
00:41:31 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Roma with 29862 data points
00:41:38 - cmdstanpy - INFO - Chain [1] start processing            (1 + 1) / 2]
00:41:50 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Frosinone with 29862 data points
00:42:55 - cmdstanpy - INFO - Chain [1] start processing    

01:03:29 - cmdstanpy - INFO - Chain [1] done processing
01:03:31 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Latina with 29862 data points
Processing group: Viterbo with 29862 data points
01:03:37 - cmdstanpy - INFO - Chain [1] start processing
01:03:39 - cmdstanpy - INFO - Chain [1] start processing
01:03:59 - cmdstanpy - INFO - Chain [1] done processing
01:03:59 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Roma with 29862 data points                       (1 + 1) / 2]
01:04:08 - cmdstanpy - INFO - Chain [1] start processing
01:04:23 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Frosinone with 29862 data points
01:05:00 - cmdstanpy - INFO - Chain [1] start processing            (0 + 2) / 2]
01:05:00 - cmdstanpy - INFO - Chain [1] start processing
01:05:17 - cmdstanpy - INFO - Chain [1] done processing
01:05:20 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Viterbo with 29862 data points
01:05:26 - cmdstanpy 

01:25:10 - cmdstanpy - INFO - Chain [1] start processing
01:25:29 - cmdstanpy - INFO - Chain [1] done processing             (0 + 2) / 2]
01:25:30 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Roma with 29862 data points                       (1 + 1) / 2]
01:25:37 - cmdstanpy - INFO - Chain [1] start processing
01:25:50 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Frosinone with 29862 data points
01:26:28 - cmdstanpy - INFO - Chain [1] start processing            (0 + 2) / 2]
01:26:28 - cmdstanpy - INFO - Chain [1] start processing
01:26:46 - cmdstanpy - INFO - Chain [1] done processing
01:26:47 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Latina with 29862 data points
Processing group: Viterbo with 29862 data points
01:26:54 - cmdstanpy - INFO - Chain [1] start processing
01:26:55 - cmdstanpy - INFO - Chain [1] start processing
01:27:13 - cmdstanpy - INFO - Chain [1] done processing
01:27:16 - cmdstanpy - INFO - Chain [1] done

01:45:10 - cmdstanpy - INFO - Chain [1] done processing             (0 + 2) / 2]
Processing group: Roma with 29862 data points                       (1 + 1) / 2]
01:45:18 - cmdstanpy - INFO - Chain [1] start processing
01:45:32 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Frosinone with 29862 data pointsProcessing group: Rieti with 29862 data points

01:48:19 - cmdstanpy - INFO - Chain [1] start processing
01:48:19 - cmdstanpy - INFO - Chain [1] start processing
01:48:36 - cmdstanpy - INFO - Chain [1] done processing
01:48:40 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Latina with 29862 data points
01:48:45 - cmdstanpy - INFO - Chain [1] start processing
Processing group: Viterbo with 29862 data points
01:48:48 - cmdstanpy - INFO - Chain [1] start processing
01:49:04 - cmdstanpy - INFO - Chain [1] done processing             (0 + 2) / 2]
01:49:06 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Roma with 29862 data points
01:49:1

02:06:57 - cmdstanpy - INFO - Chain [1] done processing             (1 + 1) / 2]
Processing group: Rieti with 29862 data pointsProcessing group: Frosinone with 29862 data points

02:09:39 - cmdstanpy - INFO - Chain [1] start processing
02:09:39 - cmdstanpy - INFO - Chain [1] start processing
02:09:58 - cmdstanpy - INFO - Chain [1] done processing
02:09:59 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Latina with 29862 data points
Processing group: Viterbo with 29862 data points
02:10:06 - cmdstanpy - INFO - Chain [1] start processing
02:10:07 - cmdstanpy - INFO - Chain [1] start processing
02:10:26 - cmdstanpy - INFO - Chain [1] done processing             (0 + 2) / 2]
02:10:27 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Roma with 29862 data points                       (1 + 1) / 2]
02:10:36 - cmdstanpy - INFO - Chain [1] start processing
02:10:47 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Frosinone with 29862 data points
02

02:50:29 - cmdstanpy - INFO - Chain [1] start processing            (0 + 2) / 2]
02:50:29 - cmdstanpy - INFO - Chain [1] start processing
02:50:48 - cmdstanpy - INFO - Chain [1] done processing
02:50:49 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Latina with 29862 data points
Processing group: Viterbo with 29862 data points
02:50:57 - cmdstanpy - INFO - Chain [1] start processing
02:50:57 - cmdstanpy - INFO - Chain [1] start processing
02:51:13 - cmdstanpy - INFO - Chain [1] done processing
02:51:16 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Roma with 29862 data points                       (1 + 1) / 2]
02:51:25 - cmdstanpy - INFO - Chain [1] start processing
02:51:43 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Frosinone with 29862 data points
02:52:22 - cmdstanpy - INFO - Chain [1] start processing            (0 + 2) / 2]
02:52:22 - cmdstanpy - INFO - Chain [1] start processing
02:52:41 - cmdstanpy - INFO - Chain [1] done

03:13:49 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Latina with 29862 data points
03:13:56 - cmdstanpy - INFO - Chain [1] start processing
Processing group: Viterbo with 29862 data points
03:13:58 - cmdstanpy - INFO - Chain [1] start processing
03:14:14 - cmdstanpy - INFO - Chain [1] done processing
03:14:16 - cmdstanpy - INFO - Chain [1] done processing             (0 + 2) / 2]
Processing group: Roma with 29862 data points
03:14:22 - cmdstanpy - INFO - Chain [1] start processing
03:14:32 - cmdstanpy - INFO - Chain [1] done processing             (1 + 1) / 2]
Processing group: Frosinone with 29862 data points
03:15:46 - cmdstanpy - INFO - Chain [1] start processing            (0 + 2) / 2]
03:15:46 - cmdstanpy - INFO - Chain [1] start processing
03:16:04 - cmdstanpy - INFO - Chain [1] done processing
03:16:07 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Latina with 29862 data points
03:16:12 - cmdstanpy - INFO - Chain [1] start processing
Proces

03:32:05 - cmdstanpy - INFO - Chain [1] start processing
03:32:23 - cmdstanpy - INFO - Chain [1] done processing
03:32:24 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Roma with 29862 data points
03:32:31 - cmdstanpy - INFO - Chain [1] start processing            (1 + 1) / 2]
03:32:44 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Frosinone with 29862 data points
03:33:20 - cmdstanpy - INFO - Chain [1] start processing            (0 + 2) / 2]
03:33:20 - cmdstanpy - INFO - Chain [1] start processing
03:33:37 - cmdstanpy - INFO - Chain [1] done processing
03:33:39 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Latina with 29862 data points
03:33:45 - cmdstanpy - INFO - Chain [1] start processing
Processing group: Viterbo with 29862 data points
03:33:47 - cmdstanpy - INFO - Chain [1] start processing
03:34:00 - cmdstanpy - INFO - Chain [1] done processing
03:34:01 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Roma w

Processing group: Roma with 29862 data points
03:46:58 - cmdstanpy - INFO - Chain [1] start processing            (1 + 1) / 2]
03:47:08 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Frosinone with 29862 data points
03:47:44 - cmdstanpy - INFO - Chain [1] start processing            (0 + 2) / 2]
03:47:44 - cmdstanpy - INFO - Chain [1] start processing
03:48:02 - cmdstanpy - INFO - Chain [1] done processing
03:48:03 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Latina with 29862 data points
Processing group: Viterbo with 29862 data points
03:48:10 - cmdstanpy - INFO - Chain [1] start processing
03:48:11 - cmdstanpy - INFO - Chain [1] start processing
03:48:24 - cmdstanpy - INFO - Chain [1] done processing
03:48:25 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Roma with 29862 data points                       (1 + 1) / 2]
03:48:33 - cmdstanpy - INFO - Chain [1] start processing
03:48:44 - cmdstanpy - INFO - Chain [1] done processing

+--------+---------+------------------+------------------+------------------+-----------------+
|    Time|     NOME|               MSE|              RMSE|               MAE|         Coverage|
+--------+---------+------------------+------------------+------------------+-----------------+
|  5 Days|  Viterbo|16.003960724139166|  4.00049505988186| 2.973204787744828|94.27705223880596|
|  5 Days|    Rieti|10.248351804787955|3.2013047035213558| 2.322585598394069| 95.0839552238806|
|  5 Days|   Latina| 20.95198798697315| 4.577334157233133|3.3648135928169416|94.10914179104478|
|  5 Days|Frosinone|21.226793891930374|4.6072544852580455|3.2211363150914343|93.95055970149254|
|  5 Days|     Roma|28.009525101490844| 5.292402583089352|3.9372356263705406|94.70149253731344|
| 30 Days|  Viterbo|15.830821758352895|3.9787965213557848| 2.958620307652071|94.45099818511797|
| 30 Days|    Rieti|10.127774617945962|3.1824164746220696| 2.313476887347684|95.18602540834846|
| 30 Days|   Latina|20.703560563272596| 

In [None]:
result_df_PM25_pd = result_df_PM25.pandas_api()
result_df_PM25_pd

PM10

In [13]:
from pyspark.sql.functions import *
from time import time
import matplotlib.pyplot as plt
import os

data_PM10_prophet = data_PM10.groupBy("NOME", "original_date_time").agg(avg("c_PM10").alias("avg_c_PM10"))
data_PM10_prophet.show()

data_PM10_prophet_u = data_PM10_prophet.withColumnRenamed("avg_c_PM10", "y").withColumnRenamed("original_date_time", "ds")
# data_PM25_prophet = data_PM25_prophet_u.withColumn("ds", to_timestamp("ds", "yyyy-MM-dd HH:mm:ss"))
data_PM10_prophet = data_PM10_prophet_u.withColumn("ds", to_timestamp("ds", "yyyy-MM-dd HH:mm:ss")) \
                                       .withColumn("ds", from_utc_timestamp("ds", "Europe/Rome"))

data_PM10_prophet.printSchema()


def train_and_forecast(group):
    
    print(f"Processing group: {group['NOME'].iloc[0]} with {len(group)} data points")
    
    # Initiate the model
    
    # Convert "ds" column to pandas datetime format and set timezone to Europe/Rome
    group["ds"] = pd.to_datetime(group["ds"])

    m = Prophet(seasonality_mode="additive",
                    yearly_seasonality=True,
                    weekly_seasonality=True,
                    daily_seasonality=True,
                    interval_width=0.95)
    #m.add_country_holidays(country_name="IT")
    
    # Fit the model
    m.fit(group[group['ds'] <= cutoff_date])
    
    
    # Make predictions
    forecast = m.predict(group)

    # Create the directory if it does not exist
    path = '/afs/enea.it/por/user/nafis/PFS/por/Thesis/codes/prophet_lazio/PM10/'

    if not os.path.exists(path):
        os.makedirs(path)
    
    try:
        fig1 = m.plot(forecast)
        fig1.suptitle(f"PM10 Concentration Forecast for {group['NOME'].iloc[0]}")
        fig1.legend(['Actual', 'Forecasted'], loc='upper right')
        fig1.subplots_adjust(top=0.9)
        
        fig2 = m.plot_components(forecast)
        fig2.suptitle(f"PM10 Concentration Component Forecast for {group['NOME'].iloc[0]}")
        fig2.subplots_adjust(top=0.9)

        # Save the plots as images using Matplotlib's savefig method
        fig1.savefig(f"/afs/enea.it/por/user/nafis/PFS/por/Thesis/codes/prophet_lazio/PM10/{group['NOME'].iloc[0]}_prophet_forecast.png")
        fig2.savefig(f"/afs/enea.it/por/user/nafis/PFS/por/Thesis/codes/prophet_lazio/PM10/{group['NOME'].iloc[0]}_prophet_components.png")
    except Exception as e:
        print(f"Error generating plots for group {group['NOME'].iloc[0]}: {str(e)}")
    
    
    f_pd = forecast[['ds', 'yhat', 'yhat_lower', 'yhat_upper','trend', 'yearly', 'weekly', 'daily']].set_index("ds")
 
    group_pd = group[['ds', 'NOME', 'y']].set_index("ds")
    
    result_pd = f_pd.join( group_pd, how = "left")
    result_pd.reset_index(level=0, inplace=True)
    
    result_pd['NOME'] = group['NOME'].iloc[0]
    
    #Return the forecasted results
    return result_pd[['ds', 'NOME', 'y', 'yhat', 'yhat_upper', 'yhat_lower','trend', 'yearly', 'weekly', 'daily']] 

# Define the restult schema
result_schema =StructType([
    StructField('ds',TimestampType()),
    StructField('NOME', StringType()),
    StructField('y', DoubleType()),
    StructField('yhat',FloatType()),
    StructField('yhat_upper',FloatType()),
    StructField('yhat_lower',FloatType()),
    StructField('trend',FloatType()),
    StructField('yearly',FloatType()),
    StructField('weekly',FloatType()),
    StructField('daily',FloatType())
  ])

# create train and test data based on cutoff date
cutoff_date = "2021-06-30 23:00:00"

data_PM10_prophet_train = data_PM10_prophet.filter(col('ds') <= cutoff_date)
data_PM10_prophet_test = data_PM10_prophet.filter((col('ds') > cutoff_date))

max_date = data_PM10_prophet_test.agg(max('ds')).collect()[0][0]

period = data_PM10_prophet_test.count()

# Start time
start_time = time()
# Train and forecast by ticker 
spark_forecast_PM10 = data_PM10_prophet.groupBy("NOME").applyInPandas(train_and_forecast, schema=result_schema)
# Processing time
print('The time used for the Spark forecast is ', time()-start_time)

spark_forecast_PM10.show()

                                                                                

+-------+-------------------+------------------+
|   NOME| original_date_time|        avg_c_PM10|
+-------+-------------------+------------------+
|Viterbo|2022-02-02 01:00:00| 2.305790754424778|
|  Rieti|2021-11-01 05:00:00| 3.048606228323699|
|   Roma|2021-12-12 02:00:00| 3.200772948664688|
|  Rieti|2021-12-19 15:00:00| 6.602085763583815|
|  Rieti|2021-05-20 22:00:00|2.0480550364161845|
|  Rieti|2021-10-08 15:00:00|2.2356599597109823|
|   Roma|2021-05-22 11:00:00| 5.616283332344214|
| Latina|2021-06-10 21:00:00|  4.90211026642857|
| Latina|2021-12-13 00:00:00| 8.493514665000001|
|   Roma|2021-10-11 14:00:00| 3.549662149851631|
|Viterbo|2021-06-27 07:00:00|3.9166167650442487|
|   Roma|2021-10-11 19:00:00| 4.065098945103857|
|Viterbo|2021-10-08 00:00:00| 2.278955902212388|
|  Rieti|2021-10-08 07:00:00|2.3133043560693642|
|Viterbo|2021-12-07 13:00:00| 6.111837910619468|
| Latina|2021-05-21 09:00:00| 4.879802729999999|
|Viterbo|2021-11-29 20:00:00|3.0105132964601746|
|  Rieti|2021-03-09 

                                                                                

The time used for the Spark forecast is  0.03782367706298828


Processing group: Rieti with 29862 data points                      (0 + 1) / 1]
14:30:51 - cmdstanpy - INFO - Chain [1] start processing
14:31:03 - cmdstanpy - INFO - Chain [1] done processing


+-------------------+-----+------------------+---------+----------+----------+---------+---------+----------+-----------+
|                 ds| NOME|                 y|     yhat|yhat_upper|yhat_lower|    trend|   yearly|    weekly|      daily|
+-------------------+-----+------------------+---------+----------+----------+---------+---------+----------+-----------+
|2019-01-01 01:00:00|Rieti| 8.187913463005781|6.3781853|  15.45299| -2.698246|1.4668721|2.9743783|0.25463605|  1.6822985|
|2019-01-01 02:00:00|Rieti| 7.400683825433524| 6.099694| 15.202893| -2.624092|1.4721631| 2.975724| 0.2781975|  1.3736093|
|2019-01-01 03:00:00|Rieti|   6.2553596150289|5.7093377| 14.542471| -3.154002|1.4774541| 2.977095|0.29945415|  0.9553345|
|2019-01-01 04:00:00|Rieti| 5.694775547976879|5.3087525| 13.922252|-3.5283556|1.4827452|2.9784915| 0.3182476|  0.5292683|
|2019-01-01 05:00:00|Rieti| 5.556005865895955|5.0067577| 13.895753| -4.597168|1.4880362|2.9799137| 0.3344401| 0.20436773|
|2019-01-01 06:00:00|Rie

Processing group: Viterbo with 29862 data points
                                                                                

In [14]:
from pyspark.sql.functions import col
from pyspark.sql.functions import pow, sqrt
from prophet.diagnostics import performance_metrics


# Calculate MAE
mae_PM10 = spark_forecast_PM10.select(col("NOME"), abs(col("y") - col("yhat")).alias("error")) \
                    .groupBy("NOME") \
                    .agg({"error": "mean"}) \
                    .withColumnRenamed("avg(error)", "MAE")


# Calculate MSE
mse_PM10 = spark_forecast_PM10.select(col("NOME"), pow(col("y") - col("yhat"), 2).alias("error")) \
                    .groupBy("NOME") \
                    .agg({"error": "mean"}) \
                    .withColumnRenamed("avg(error)", "MSE")

# Calculate RMSE
rmse_PM10 = spark_forecast_PM10.select(col("NOME"), pow(col("y") - col("yhat"), 2).alias("error")) \
                     .groupBy("NOME") \
                     .agg({"error": "mean"}) \
                     .withColumnRenamed("avg(error)", "MSE") \
                     .withColumn("RMSE", sqrt(col("MSE")).alias("RMSE"))

# Show the results
mae_PM10.show()
mse_PM10.show()
rmse_PM10.show()

14:31:20 - cmdstanpy - INFO - Chain [1] start processing         (0 + 96) / 119]
14:32:00 - cmdstanpy - INFO - Chain [1] done processing             (0 + 2) / 2]
Processing group: Frosinone with 29862 data points
Processing group: Rieti with 29862 data points
14:32:06 - cmdstanpy - INFO - Chain [1] start processing
14:32:06 - cmdstanpy - INFO - Chain [1] start processing
14:32:20 - cmdstanpy - INFO - Chain [1] done processing
14:32:26 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Viterbo with 29862 data points
14:32:30 - cmdstanpy - INFO - Chain [1] start processing
Processing group: Latina with 29862 data points
14:32:36 - cmdstanpy - INFO - Chain [1] start processing
14:32:44 - cmdstanpy - INFO - Chain [1] done processing
14:32:53 - cmdstanpy - INFO - Chain [1] done processing             (1 + 1) / 2]
Processing group: Roma with 29862 data points
14:33:02 - cmdstanpy - INFO - Chain [1] start processing
14:33:18 - cmdstanpy - INFO - Chain [1] done processing
       

+---------+------------------+
|     NOME|               MAE|
+---------+------------------+
|  Viterbo| 4.078447906746063|
|    Rieti|3.1889142430171873|
|   Latina| 4.374905542398387|
|Frosinone| 4.444598034469279|
|     Roma| 4.623973814691109|
+---------+------------------+



Processing group: Frosinone with 29862 data points
14:34:08 - cmdstanpy - INFO - Chain [1] start processing            (0 + 2) / 2]
14:34:08 - cmdstanpy - INFO - Chain [1] start processing
14:34:22 - cmdstanpy - INFO - Chain [1] done processing
14:34:29 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Viterbo with 29862 data points
14:34:32 - cmdstanpy - INFO - Chain [1] start processing
Processing group: Latina with 29862 data points
14:34:38 - cmdstanpy - INFO - Chain [1] start processing
14:34:47 - cmdstanpy - INFO - Chain [1] done processing
14:34:56 - cmdstanpy - INFO - Chain [1] done processing             (1 + 1) / 2]
Processing group: Roma with 29862 data points
14:35:05 - cmdstanpy - INFO - Chain [1] start processing
14:35:20 - cmdstanpy - INFO - Chain [1] done processing
                                                                                

+---------+------------------+
|     NOME|               MSE|
+---------+------------------+
|  Viterbo|  36.4500033022641|
|    Rieti| 23.69100677132566|
|   Latina|  40.8396873058619|
|Frosinone|40.885865194485106|
|     Roma| 42.31326172784455|
+---------+------------------+



Processing group: Frosinone with 29862 data points
14:36:10 - cmdstanpy - INFO - Chain [1] start processing            (0 + 2) / 2]
14:36:10 - cmdstanpy - INFO - Chain [1] start processing
14:36:25 - cmdstanpy - INFO - Chain [1] done processing
14:36:31 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Viterbo with 29862 data points
14:36:34 - cmdstanpy - INFO - Chain [1] start processing
Processing group: Latina with 29862 data points
14:36:41 - cmdstanpy - INFO - Chain [1] start processing
14:36:48 - cmdstanpy - INFO - Chain [1] done processing
14:36:57 - cmdstanpy - INFO - Chain [1] done processing             (1 + 1) / 2]
Processing group: Roma with 29862 data points
14:37:07 - cmdstanpy - INFO - Chain [1] start processing
14:37:24 - cmdstanpy - INFO - Chain [1] done processing


+---------+------------------+------------------+
|     NOME|               MSE|              RMSE|
+---------+------------------+------------------+
|  Viterbo|  36.4500033022641|  6.03738381273413|
|    Rieti| 23.69100677132566| 4.867340831637503|
|   Latina|  40.8396873058619|6.3905936583279885|
|Frosinone|40.885865194485106|6.3942055952624095|
|     Roma| 42.31326172784455| 6.504864466523845|
+---------+------------------+------------------+



                                                                                

In [13]:
#Long and Short Term Forecast

from pyspark.sql.functions import *
from time import time
import matplotlib.pyplot as plt
import os

data_PM10_prophet = data_PM10.groupBy("NOME", "original_date_time").agg(avg("c_PM10").alias("avg_c_PM10"))
data_PM10_prophet.show()

data_PM10_prophet_u = data_PM10_prophet.withColumnRenamed("avg_c_PM10", "y").withColumnRenamed("original_date_time", "ds")
# data_PM25_prophet = data_PM25_prophet_u.withColumn("ds", to_timestamp("ds", "yyyy-MM-dd HH:mm:ss"))
data_PM10_prophet = data_PM10_prophet_u.withColumn("ds", to_timestamp("ds", "yyyy-MM-dd HH:mm:ss")) \
                                       .withColumn("ds", from_utc_timestamp("ds", "Europe/Rome"))

#data_PM10_prophet.printSchema()


def train_and_forecast(group):
    
    print(f"Processing group: {group['NOME'].iloc[0]} with {len(group)} data points")
    
    # Initiate the model
    
    # Convert "ds" column to pandas datetime format and set timezone to Europe/Rome
    #group["ds"] = pd.to_datetime(group["ds"]).dt.tz_localize('Europe/Rome').dt.tz_convert(pytz.UTC)
    group["ds"] = pd.to_datetime(group["ds"])

    m = Prophet(seasonality_mode="additive",
                    yearly_seasonality=True,
                    weekly_seasonality=True,
                    daily_seasonality=True,
                    interval_width=0.95)
    #m.add_country_holidays(country_name="IT")
    
    # Fit the model
    m.fit(group[group['ds'] <= cutoff_date])
    
    
    # Make predictions
    forecast = m.predict(group)


    # Create the directory if it does not exist
    path = '/afs/enea.it/por/user/nafis/PFS/por/Thesis/codes/prophet_lazio/PM10/'

    if not os.path.exists(path):
        os.makedirs(path)
    
    #try:
     #   fig1 = m.plot(forecast)
      #  fig1.suptitle(f"PM10 Concentration Forecast for {group['NOME'].iloc[0]}")
       # fig1.legend(['Actual', 'Forecasted'], loc='upper right')
       # fig1.subplots_adjust(top=0.9)
        
       # fig2 = m.plot_components(forecast)
       # fig2.suptitle(f"PM10 Concentration Component Forecast for {group['NOME'].iloc[0]}")
       # fig2.subplots_adjust(top=0.9)

        # Save the plots as images using Matplotlib's savefig method
        #fig1.savefig(f"/afs/enea.it/por/user/nafis/PFS/por/Thesis/codes/prophet/PM10/{group['NOME'].iloc[0]}_prophet_forecast.png")
       # fig2.savefig(f"/afs/enea.it/por/user/nafis/PFS/por/Thesis/codes/prophet/PM10/{group['NOME'].iloc[0]}_prophet_components.png")
   # except Exception as e:
   #     print(f"Error generating plots for group {group['NOME'].iloc[0]}: {str(e)}")
    
    
    f_pd = forecast[['ds', 'yhat', 'yhat_lower', 'yhat_upper','trend', 'yearly', 'weekly', 'daily']].set_index("ds")
 
    group_pd = group[['ds', 'NOME', 'y']].set_index("ds")
    
    result_pd = f_pd.join( group_pd, how = "left")
    result_pd.reset_index(level=0, inplace=True)
    
    result_pd['NOME'] = group['NOME'].iloc[0]
    
    #Return the forecasted results
    return result_pd[['ds', 'NOME', 'y', 'yhat', 'yhat_upper', 'yhat_lower','trend', 'yearly', 'weekly', 'daily']] 

# Define the restult schema
result_schema =StructType([
    StructField('ds',TimestampType()),
    StructField('NOME', StringType()),
    StructField('y', DoubleType()),
    StructField('yhat',FloatType()),
    StructField('yhat_upper',FloatType()),
    StructField('yhat_lower',FloatType()),
    StructField('trend',FloatType()),
    StructField('yearly',FloatType()),
    StructField('weekly',FloatType()),
    StructField('daily',FloatType())
  ])

# create train and test data based on cutoff date
cutoff_date = "2021-06-30 23:00:00"

data_PM10_prophet_train = data_PM10_prophet.filter(col('ds') <= cutoff_date)
data_PM10_prophet_test = data_PM10_prophet.filter((col('ds') > cutoff_date))

max_date = data_PM10_prophet_test.agg(max('ds')).collect()[0][0]

period = data_PM10_prophet_test.count()

# Start time
start_time = time()
# Train and forecast by ticker 
spark_forecast_PM10 = data_PM10_prophet.groupBy("NOME").applyInPandas(train_and_forecast, schema=result_schema)
# Processing time
print('The time used for the Spark forecast is ', time()-start_time)

#spark_forecast_PM10.show()

                                                                                

+-------+-------------------+------------------+
|   NOME| original_date_time|        avg_c_PM10|
+-------+-------------------+------------------+
|Viterbo|2022-02-02 01:00:00| 2.305790754424778|
|  Rieti|2021-11-01 05:00:00| 3.048606228323699|
|   Roma|2021-12-12 02:00:00| 3.200772948664688|
|  Rieti|2021-12-19 15:00:00| 6.602085763583815|
|  Rieti|2021-05-20 22:00:00|2.0480550364161845|
|  Rieti|2021-10-08 15:00:00|2.2356599597109823|
|   Roma|2021-05-22 11:00:00| 5.616283332344214|
| Latina|2021-06-10 21:00:00|  4.90211026642857|
| Latina|2021-12-13 00:00:00| 8.493514665000001|
|   Roma|2021-10-11 14:00:00| 3.549662149851631|
|Viterbo|2021-06-27 07:00:00|3.9166167650442487|
|   Roma|2021-10-11 19:00:00| 4.065098945103857|
|Viterbo|2021-10-08 00:00:00| 2.278955902212388|
|  Rieti|2021-10-08 07:00:00|2.3133043560693642|
|Viterbo|2021-12-07 13:00:00| 6.111837910619468|
| Latina|2021-05-21 09:00:00| 4.879802729999999|
|Viterbo|2021-11-29 20:00:00|3.0105132964601746|
|  Rieti|2021-03-09 



The time used for the Spark forecast is  0.04484677314758301


                                                                                

In [14]:
from pyspark.sql.functions import col, pow, sqrt

# Define cutoff dates
cutoff_dates = {
    "5 Days": "2021-07-05 23:00:00",
    "30 Days": "2021-07-30 23:00:00",
    "90 Days": "2021-09-28 23:00:00",
    "180 Days": "2021-12-27 23:00:00",
    "365 Days": "2022-06-29 23:00:00"
}

# Define a list to store the evaluation results
results = []

# Compute evaluation metrics for each time period and NOME
for period, cutoff in cutoff_dates.items():
    for nome in spark_forecast_PM10.select("NOME").distinct().rdd.flatMap(lambda x: x).collect():
        # Filter the forecast data based on the cutoff date and NOME
        forecast_data_PM10 = spark_forecast_PM10.filter((col("ds") <= cutoff) & (col("NOME") == nome))

        # Calculate MAE
        mae = forecast_data_PM10.select(abs(col("y") - col("yhat")).alias("error")) \
                             .agg({"error": "mean"}) \
                             .withColumnRenamed("avg(error)", "MAE") \
                             .collect()[0]["MAE"]

        # Calculate MSE
        mse = forecast_data_PM10.select(pow(col("y") - col("yhat"), 2).alias("error")) \
                             .agg({"error": "mean"}) \
                             .withColumnRenamed("avg(error)", "MSE") \
                             .collect()[0]["MSE"]

        # Calculate RMSE
        rmse = forecast_data_PM10.select(pow(col("y") - col("yhat"), 2).alias("error")) \
                              .agg({"error": "mean"}) \
                              .withColumnRenamed("avg(error)", "MSE") \
                              .withColumn("RMSE", sqrt(col("MSE")).alias("RMSE")) \
                              .collect()[0]["RMSE"]

        # Calculate coverage
        coverage = forecast_data_PM10.filter((col("y") >= col("yhat_lower")) & (col("y") <= col("yhat_upper"))) \
                                  .count() / forecast_data_PM10.count() * 100

        # Add the results to the list
        results.append((period, nome, mse, rmse, mae, coverage))

# Create a Spark DataFrame from the results list
result_df_PM10 = spark.createDataFrame(results, ["Time", "NOME", "MSE", "RMSE", "MAE", "Coverage"])

# Show the results
result_df_PM10.show(100)

Processing group: Frosinone with 29862 data pointsProcessing group: Rieti with 29862 data points

03:51:24 - cmdstanpy - INFO - Chain [1] start processing
03:51:24 - cmdstanpy - INFO - Chain [1] start processing
03:51:35 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Viterbo with 29862 data points
03:51:43 - cmdstanpy - INFO - Chain [1] start processing
03:51:43 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Latina with 29862 data points
03:51:51 - cmdstanpy - INFO - Chain [1] start processing
03:51:54 - cmdstanpy - INFO - Chain [1] done processing             (0 + 2) / 2]
03:52:05 - cmdstanpy - INFO - Chain [1] done processing             (1 + 1) / 2]
Processing group: Roma with 29862 data points
03:52:13 - cmdstanpy - INFO - Chain [1] start processing
03:52:28 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Frosinone with 29862 data points
03:53:05 - cmdstanpy - INFO - Chain [1] start processing            (0 + 2) / 2]
03:53:05 - c

04:06:57 - cmdstanpy - INFO - Chain [1] done processing
04:07:04 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Viterbo with 29862 data points
04:07:05 - cmdstanpy - INFO - Chain [1] start processing
Processing group: Latina with 29862 data points
04:07:12 - cmdstanpy - INFO - Chain [1] start processing
04:07:19 - cmdstanpy - INFO - Chain [1] done processing
04:07:27 - cmdstanpy - INFO - Chain [1] done processing             (1 + 1) / 2]
Processing group: Roma with 29862 data points
04:07:35 - cmdstanpy - INFO - Chain [1] start processing
04:07:49 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Frosinone with 29862 data points
04:08:24 - cmdstanpy - INFO - Chain [1] start processing            (0 + 2) / 2]
04:08:24 - cmdstanpy - INFO - Chain [1] start processing
04:08:39 - cmdstanpy - INFO - Chain [1] done processing
04:08:45 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Viterbo with 29862 data points
04:08:47 - cmdstanpy - INFO - C

Processing group: Latina with 29862 data points
04:22:34 - cmdstanpy - INFO - Chain [1] start processing
04:22:42 - cmdstanpy - INFO - Chain [1] done processing
04:22:49 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Roma with 29862 data points                       (1 + 1) / 2]
04:22:57 - cmdstanpy - INFO - Chain [1] start processing
04:23:11 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Frosinone with 29862 data points
04:23:51 - cmdstanpy - INFO - Chain [1] start processing            (0 + 2) / 2]
04:23:51 - cmdstanpy - INFO - Chain [1] start processing
04:24:06 - cmdstanpy - INFO - Chain [1] done processing
04:24:11 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Viterbo with 29862 data points
04:24:14 - cmdstanpy - INFO - Chain [1] start processing
Processing group: Latina with 29862 data points
04:24:18 - cmdstanpy - INFO - Chain [1] start processing
04:24:25 - cmdstanpy - INFO - Chain [1] done processing
04:24:31 - cmdstanpy 

04:53:18 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Frosinone with 29862 data points
04:53:58 - cmdstanpy - INFO - Chain [1] start processing            (0 + 2) / 2]
04:53:58 - cmdstanpy - INFO - Chain [1] start processing
04:54:12 - cmdstanpy - INFO - Chain [1] done processing
04:54:16 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Viterbo with 29862 data points
04:54:20 - cmdstanpy - INFO - Chain [1] start processing
Processing group: Latina with 29862 data points
04:54:24 - cmdstanpy - INFO - Chain [1] start processing
04:54:31 - cmdstanpy - INFO - Chain [1] done processing
04:54:37 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Roma with 29862 data points                       (1 + 1) / 2]
04:54:44 - cmdstanpy - INFO - Chain [1] start processing
04:54:59 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Frosinone with 29862 data points
04:55:35 - cmdstanpy - INFO - Chain [1] start processing            (0 + 2)

05:09:03 - cmdstanpy - INFO - Chain [1] start processing            (0 + 2) / 2]
05:09:03 - cmdstanpy - INFO - Chain [1] start processing
05:09:17 - cmdstanpy - INFO - Chain [1] done processing
05:09:23 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Viterbo with 29862 data points
05:09:25 - cmdstanpy - INFO - Chain [1] start processing
Processing group: Latina with 29862 data points
05:09:31 - cmdstanpy - INFO - Chain [1] start processing
05:09:40 - cmdstanpy - INFO - Chain [1] done processing
05:09:47 - cmdstanpy - INFO - Chain [1] done processing             (1 + 1) / 2]
Processing group: Roma with 29862 data points
05:09:55 - cmdstanpy - INFO - Chain [1] start processing
05:10:09 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Frosinone with 29862 data points
05:10:46 - cmdstanpy - INFO - Chain [1] start processing            (0 + 2) / 2]
05:10:46 - cmdstanpy - INFO - Chain [1] start processing
05:11:01 - cmdstanpy - INFO - Chain [1] done processin

Processing group: Viterbo with 29862 data points
05:24:39 - cmdstanpy - INFO - Chain [1] done processing
05:24:41 - cmdstanpy - INFO - Chain [1] start processing
Processing group: Latina with 29862 data points
05:24:47 - cmdstanpy - INFO - Chain [1] start processing
05:24:55 - cmdstanpy - INFO - Chain [1] done processing
05:25:03 - cmdstanpy - INFO - Chain [1] done processing             (1 + 1) / 2]
Processing group: Roma with 29862 data points
05:25:10 - cmdstanpy - INFO - Chain [1] start processing
05:25:26 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Frosinone with 29862 data points
05:26:01 - cmdstanpy - INFO - Chain [1] start processing            (0 + 2) / 2]
05:26:01 - cmdstanpy - INFO - Chain [1] start processing
05:26:14 - cmdstanpy - INFO - Chain [1] done processing
05:26:18 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Viterbo with 29862 data points
05:26:22 - cmdstanpy - INFO - Chain [1] start processing
Processing group: Latina with 

05:40:02 - cmdstanpy - INFO - Chain [1] start processing
05:40:11 - cmdstanpy - INFO - Chain [1] done processing
05:40:17 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Roma with 29862 data points                       (1 + 1) / 2]
05:40:25 - cmdstanpy - INFO - Chain [1] start processing
05:40:39 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Frosinone with 29862 data points==>              (8 + 3) / 11]
Processing group: Rieti with 29862 data points
05:41:20 - cmdstanpy - INFO - Chain [1] start processing            (0 + 2) / 2]
05:41:20 - cmdstanpy - INFO - Chain [1] start processing
05:41:35 - cmdstanpy - INFO - Chain [1] done processing
05:41:41 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Viterbo with 29862 data points
05:41:43 - cmdstanpy - INFO - Chain [1] start processing
Processing group: Latina with 29862 data points
05:41:49 - cmdstanpy - INFO - Chain [1] start processing
05:41:55 - cmdstanpy - INFO - Chain [1] done pro

06:11:26 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Frosinone with 29862 data points
06:12:03 - cmdstanpy - INFO - Chain [1] start processing            (0 + 2) / 2]
06:12:03 - cmdstanpy - INFO - Chain [1] start processing
06:12:18 - cmdstanpy - INFO - Chain [1] done processing
06:12:24 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Viterbo with 29862 data points
06:12:26 - cmdstanpy - INFO - Chain [1] start processing
Processing group: Latina with 29862 data points
06:12:32 - cmdstanpy - INFO - Chain [1] start processing
06:12:38 - cmdstanpy - INFO - Chain [1] done processing
06:12:46 - cmdstanpy - INFO - Chain [1] done processing             (1 + 1) / 2]
Processing group: Roma with 29862 data points
06:12:54 - cmdstanpy - INFO - Chain [1] start processing
06:13:08 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Frosinone with 29862 data points
06:13:47 - cmdstanpy - INFO - Chain [1] start processing            (0 + 2) / 2]
06:1

06:27:24 - cmdstanpy - INFO - Chain [1] start processing            (0 + 2) / 2]
06:27:24 - cmdstanpy - INFO - Chain [1] start processing
06:27:37 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Viterbo with 29862 data points
06:27:44 - cmdstanpy - INFO - Chain [1] done processing
06:27:46 - cmdstanpy - INFO - Chain [1] start processing
Processing group: Latina with 29862 data points
06:27:52 - cmdstanpy - INFO - Chain [1] start processing
06:28:00 - cmdstanpy - INFO - Chain [1] done processing
06:28:08 - cmdstanpy - INFO - Chain [1] done processing             (1 + 1) / 2]
Processing group: Roma with 29862 data points
06:28:15 - cmdstanpy - INFO - Chain [1] start processing
06:28:30 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Frosinone with 29862 data points
06:29:05 - cmdstanpy - INFO - Chain [1] start processing            (0 + 2) / 2]
06:29:05 - cmdstanpy - INFO - Chain [1] start processing
06:29:19 - cmdstanpy - INFO - Chain [1] done processin

06:43:03 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Viterbo with 29862 data points
06:43:05 - cmdstanpy - INFO - Chain [1] start processing
Processing group: Latina with 29862 data points
06:43:11 - cmdstanpy - INFO - Chain [1] start processing
06:43:18 - cmdstanpy - INFO - Chain [1] done processing
06:43:25 - cmdstanpy - INFO - Chain [1] done processing             (1 + 1) / 2]
Processing group: Roma with 29862 data points
06:43:33 - cmdstanpy - INFO - Chain [1] start processing
06:43:48 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Frosinone with 29862 data points
06:44:26 - cmdstanpy - INFO - Chain [1] start processing            (0 + 2) / 2]
06:44:26 - cmdstanpy - INFO - Chain [1] start processing
06:44:40 - cmdstanpy - INFO - Chain [1] done processing
06:44:45 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Viterbo with 29862 data points
06:44:48 - cmdstanpy - INFO - Chain [1] start processing
Processing group: Latina with 

06:58:28 - cmdstanpy - INFO - Chain [1] start processing
06:58:35 - cmdstanpy - INFO - Chain [1] done processing
06:58:43 - cmdstanpy - INFO - Chain [1] done processing             (1 + 1) / 2]
Processing group: Roma with 29862 data points
06:58:51 - cmdstanpy - INFO - Chain [1] start processing
06:59:05 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Frosinone with 29862 data points
06:59:45 - cmdstanpy - INFO - Chain [1] start processing            (0 + 2) / 2]
06:59:45 - cmdstanpy - INFO - Chain [1] start processing
07:00:00 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Viterbo with 29862 data points
07:00:06 - cmdstanpy - INFO - Chain [1] done processing
07:00:08 - cmdstanpy - INFO - Chain [1] start processing
Processing group: Latina with 29862 data points
07:00:14 - cmdstanpy - INFO - Chain [1] start processing
07:00:21 - cmdstanpy - INFO - Chain [1] done processing
07:00:30 - cmdstanpy - INFO - Chain [1] done processing             (1 + 1) / 2

07:29:31 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Frosinone with 29862 data points
07:30:08 - cmdstanpy - INFO - Chain [1] start processing            (0 + 2) / 2]
07:30:08 - cmdstanpy - INFO - Chain [1] start processing
07:30:20 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Viterbo with 29862 data points
07:30:28 - cmdstanpy - INFO - Chain [1] start processing
07:30:28 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Latina with 29862 data points
07:30:36 - cmdstanpy - INFO - Chain [1] start processing
07:30:41 - cmdstanpy - INFO - Chain [1] done processing
07:30:52 - cmdstanpy - INFO - Chain [1] done processing             (1 + 1) / 2]
Processing group: Roma with 29862 data points
07:31:00 - cmdstanpy - INFO - Chain [1] start processing
07:31:15 - cmdstanpy - INFO - Chain [1] done processing
                                                                                

+--------+---------+------------------+------------------+------------------+-----------------+
|    Time|     NOME|               MSE|              RMSE|               MAE|         Coverage|
+--------+---------+------------------+------------------+------------------+-----------------+
|  5 Days|  Viterbo|26.844312457867375|5.1811497235524255| 3.594806387887776|96.00746268656717|
|  5 Days|    Rieti| 20.38033065926595| 4.514457958522368| 3.031855682501042|96.40391791044776|
|  5 Days|   Latina| 34.89463072468919| 5.907167741370579| 4.122590530941324| 96.0867537313433|
|  5 Days|Frosinone| 34.76652994378737| 5.896314946115698|3.9815671910379296|96.04944029850746|
|  5 Days|     Roma| 40.36191354227106|6.3531026705280835| 4.550351683195777| 95.4757462686567|
| 30 Days|  Viterbo|26.486506043502285| 5.146504254686115| 3.571681776125837|96.00725952813067|
| 30 Days|    Rieti|20.030732943802104| 4.475570683589089|3.0080590996988046|96.42468239564428|
| 30 Days|   Latina| 34.33310547116971| 

In [None]:
result_df_PM10_pd = result_df_PM10.pandas_api()
result_df_PM10_pd

NO2

In [15]:
from pyspark.sql.functions import *
from time import time
import matplotlib.pyplot as plt
import os

data_NO2_prophet = data_NO2.groupBy("NOME", "original_date_time").agg(avg("c_NO2").alias("avg_c_NO2"))
data_NO2_prophet.show()

data_NO2_prophet_u = data_NO2_prophet.withColumnRenamed("avg_c_NO2", "y").withColumnRenamed("original_date_time", "ds")

data_NO2_prophet = data_NO2_prophet_u.withColumn("ds", to_timestamp("ds", "yyyy-MM-dd HH:mm:ss")) \
                                       .withColumn("ds", from_utc_timestamp("ds", "Europe/Rome"))

data_NO2_prophet.printSchema()

def train_and_forecast(group):
    
    print(f"Processing group: {group['NOME'].iloc[0]} with {len(group)} data points")
    
    group["ds"] = pd.to_datetime(group["ds"])

    m = Prophet(seasonality_mode="additive",
                    yearly_seasonality=True,
                    weekly_seasonality=True,
                    daily_seasonality=True,
                    interval_width=0.95)
    
    m.fit(group[group['ds'] <= cutoff_date])
    
    forecast = m.predict(group)
    
    path = '/afs/enea.it/por/user/nafis/PFS/por/Thesis/codes/prophet_lazio/NO2/'

    if not os.path.exists(path):
        os.makedirs(path)
    
    try:
        fig1 = m.plot(forecast)
        fig1.suptitle(f"NO2 Concentration Forecast for {group['NOME'].iloc[0]}")
        fig1.legend(['Actual', 'Forecasted'], loc='upper right')
        fig1.subplots_adjust(top=0.9)
        
        fig2 = m.plot_components(forecast)
        fig2.suptitle(f"NO2 Concentration Component Forecast for {group['NOME'].iloc[0]}")
        fig2.subplots_adjust(top=0.9)

        fig1.savefig(f"/afs/enea.it/por/user/nafis/PFS/por/Thesis/codes/prophet_lazio/NO2/{group['NOME'].iloc[0]}_prophet_forecast.png")
        fig2.savefig(f"/afs/enea.it/por/user/nafis/PFS/por/Thesis/codes/prophet_lazio/NO2/{group['NOME'].iloc[0]}_prophet_components.png")
    except Exception as e:
        print(f"Error generating plots for group {group['NOME'].iloc[0]}: {str(e)}")
    
    
    f_pd = forecast[['ds', 'yhat', 'yhat_lower', 'yhat_upper','trend', 'yearly', 'weekly', 'daily']].set_index("ds")
 
    group_pd = group[['ds', 'NOME', 'y']].set_index("ds")
    
    result_pd = f_pd.join( group_pd, how = "left")
    result_pd.reset_index(level=0, inplace=True)
    
    result_pd['NOME'] = group['NOME'].iloc[0]
    
    return result_pd[['ds', 'NOME', 'y', 'yhat', 'yhat_upper', 'yhat_lower','trend', 'yearly', 'weekly', 'daily']] 

result_schema =StructType([
    StructField('ds',TimestampType()),
    StructField('NOME', StringType()),
    StructField('y', DoubleType()),
    StructField('yhat',FloatType()),
    StructField('yhat_upper',FloatType()),
    StructField('yhat_lower',FloatType()),
    StructField('trend',FloatType()),
    StructField('yearly',FloatType()),
    StructField('weekly',FloatType()),
    StructField('daily',FloatType())
  ])

# create train and test data based on cutoff date
cutoff_date = "2021-06-30 23:00:00"

data_NO2_prophet_train = data_NO2_prophet.filter(col('ds') <= cutoff_date)
data_NO2_prophet_test = data_NO2_prophet.filter((col('ds') > cutoff_date))

max_date = data_NO2_prophet_test.agg(max('ds')).collect()[0][0]

period = data_NO2_prophet_test.count()

# Start time
start_time = time()
# Train and forecast by ticker 
spark_forecast_NO2 = data_NO2_prophet.groupBy("NOME").applyInPandas(train_and_forecast, schema=result_schema)
# Processing time
print('The time used for the Spark forecast is ', time()-start_time)

spark_forecast_NO2.show()


                                                                                

+-------+-------------------+------------------+
|   NOME| original_date_time|         avg_c_NO2|
+-------+-------------------+------------------+
|Viterbo|2022-02-02 01:00:00| 1.714137597787611|
|  Rieti|2021-11-01 05:00:00|0.9745878651445088|
|   Roma|2021-12-12 02:00:00|1.6926365163798216|
|  Rieti|2021-12-19 15:00:00| 2.121068303236994|
|  Rieti|2021-05-20 22:00:00|0.7832120415606936|
|  Rieti|2021-10-08 15:00:00|0.8941777683815028|
|   Roma|2021-05-22 11:00:00|1.8448723416023736|
| Latina|2021-06-10 21:00:00|3.7493962421428577|
| Latina|2021-12-13 00:00:00|4.7179716296428555|
|   Roma|2021-10-11 14:00:00|3.7846130554896145|
|Viterbo|2021-06-27 07:00:00|1.2218618397345131|
|   Roma|2021-10-11 19:00:00| 8.132516511928783|
|Viterbo|2021-10-08 00:00:00|1.8678180108407085|
|  Rieti|2021-10-08 07:00:00|2.0554645594797685|
|Viterbo|2021-12-07 13:00:00| 2.421045396017698|
| Latina|2021-05-21 09:00:00| 4.027409731500001|
|Viterbo|2021-11-29 20:00:00|3.6873097438053106|
|  Rieti|2021-03-09 

                                                                                

The time used for the Spark forecast is  0.03974723815917969


Processing group: Rieti with 29862 data points                      (0 + 1) / 1]
14:41:41 - cmdstanpy - INFO - Chain [1] start processing
14:41:53 - cmdstanpy - INFO - Chain [1] done processing


+-------------------+-----+------------------+---------+----------+-----------+---------+---------+----------+------------+
|                 ds| NOME|                 y|     yhat|yhat_upper| yhat_lower|    trend|   yearly|    weekly|       daily|
+-------------------+-----+------------------+---------+----------+-----------+---------+---------+----------+------------+
|2019-01-01 01:00:00|Rieti| 9.358907479190748| 8.854709| 13.425096|   4.283503| 3.740851|2.0150287| 0.6757538|   2.4230754|
|2019-01-01 02:00:00|Rieti| 7.972921513294797|8.0530815| 12.637769|  3.6594803|3.7411084|2.0153096| 0.6948589|    1.601805|
|2019-01-01 03:00:00|Rieti| 5.749612904335258| 7.221477|  11.67015|  2.7575915|3.7413657|2.0155883| 0.7088098|  0.75571316|
|2019-01-01 04:00:00|Rieti| 4.340772004624278|6.4760404| 10.814097|  2.0253658|3.7416232|2.0158653| 0.7178291|7.2275667E-4|
|2019-01-01 05:00:00|Rieti|3.7630883457225437|5.8987403| 10.375546|    1.06187|3.7418804|2.0161407|0.72217727|  -0.5814582|
|2019-01

Processing group: Viterbo with 29862 data points
                                                                                

In [16]:
from pyspark.sql.functions import col
from pyspark.sql.functions import pow, sqrt
from prophet.diagnostics import performance_metrics


# Calculate MAE
mae_NO2 = spark_forecast_NO2.select(col("NOME"), abs(col("y") - col("yhat")).alias("error")) \
                    .groupBy("NOME") \
                    .agg({"error": "mean"}) \
                    .withColumnRenamed("avg(error)", "MAE")

# Calculate MSE
mse_NO2 = spark_forecast_NO2.select(col("NOME"), pow(col("y") - col("yhat"), 2).alias("error")) \
                    .groupBy("NOME") \
                    .agg({"error": "mean"}) \
                    .withColumnRenamed("avg(error)", "MSE")

# Calculate RMSE
rmse_NO2 = spark_forecast_NO2.select(col("NOME"), pow(col("y") - col("yhat"), 2).alias("error")) \
                     .groupBy("NOME") \
                     .agg({"error": "mean"}) \
                     .withColumnRenamed("avg(error)", "MSE") \
                     .withColumn("RMSE", sqrt(col("MSE")).alias("RMSE"))

# Show the results
mae_NO2.show()
mse_NO2.show()
rmse_NO2.show()

14:42:05 - cmdstanpy - INFO - Chain [1] start processing         (0 + 96) / 119]
14:42:22 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Frosinone with 29862 data points                  (0 + 2) / 2]
Processing group: Rieti with 29862 data points
14:42:49 - cmdstanpy - INFO - Chain [1] start processing
14:42:49 - cmdstanpy - INFO - Chain [1] start processing
14:43:01 - cmdstanpy - INFO - Chain [1] done processing
14:43:02 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Latina with 29862 data points
Processing group: Viterbo with 29862 data points
14:43:11 - cmdstanpy - INFO - Chain [1] start processing
14:43:12 - cmdstanpy - INFO - Chain [1] start processing
14:43:21 - cmdstanpy - INFO - Chain [1] done processing
14:43:26 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Roma with 29862 data points
14:43:31 - cmdstanpy - INFO - Chain [1] start processing
14:43:37 - cmdstanpy - INFO - Chain [1] done processing             (1 + 1) / 2]
  

+---------+------------------+
|     NOME|               MAE|
+---------+------------------+
|  Viterbo| 2.108067556023767|
|    Rieti|1.5675482441877882|
|   Latina| 2.919243988286872|
|Frosinone|3.2363979006302803|
|     Roma| 6.282921837423628|
+---------+------------------+



Processing group: Frosinone with 29862 data points
14:44:28 - cmdstanpy - INFO - Chain [1] start processing            (0 + 2) / 2]
14:44:28 - cmdstanpy - INFO - Chain [1] start processing
14:44:41 - cmdstanpy - INFO - Chain [1] done processing
14:44:41 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Viterbo with 29862 data points
Processing group: Latina with 29862 data points
14:44:51 - cmdstanpy - INFO - Chain [1] start processing
14:44:51 - cmdstanpy - INFO - Chain [1] start processing
14:45:02 - cmdstanpy - INFO - Chain [1] done processing
14:45:06 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Roma with 29862 data points
14:45:11 - cmdstanpy - INFO - Chain [1] start processing
14:45:17 - cmdstanpy - INFO - Chain [1] done processing             (1 + 1) / 2]
                                                                                

+---------+------------------+
|     NOME|               MSE|
+---------+------------------+
|  Viterbo| 8.659769578120732|
|    Rieti| 4.817593283788651|
|   Latina|16.182939430452944|
|Frosinone| 17.40985050604961|
|     Roma| 65.03110489057981|
+---------+------------------+



Processing group: Frosinone with 29862 data points
14:46:09 - cmdstanpy - INFO - Chain [1] start processing            (0 + 2) / 2]
14:46:09 - cmdstanpy - INFO - Chain [1] start processing
14:46:21 - cmdstanpy - INFO - Chain [1] done processing
14:46:22 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Latina with 29862 data points
Processing group: Viterbo with 29862 data points
14:46:30 - cmdstanpy - INFO - Chain [1] start processing
14:46:32 - cmdstanpy - INFO - Chain [1] start processing
14:46:41 - cmdstanpy - INFO - Chain [1] done processing
14:46:46 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Roma with 29862 data points
14:46:51 - cmdstanpy - INFO - Chain [1] start processing
14:46:57 - cmdstanpy - INFO - Chain [1] done processing             (1 + 1) / 2]


+---------+------------------+------------------+
|     NOME|               MSE|              RMSE|
+---------+------------------+------------------+
|  Viterbo| 8.659769578120732|2.9427486433810026|
|    Rieti| 4.817593283788651| 2.194901656974328|
|   Latina|16.182939430452944| 4.022802434926795|
|Frosinone| 17.40985050604961| 4.172511294897787|
|     Roma| 65.03110489057981| 8.064186560997943|
+---------+------------------+------------------+



                                                                                

In [15]:
from pyspark.sql.functions import *
from time import time
import matplotlib.pyplot as plt
import os

data_NO2_prophet = data_NO2.groupBy("NOME", "original_date_time").agg(avg("c_NO2").alias("avg_c_NO2"))
#data_NO2_prophet.show()

data_NO2_prophet_u = data_NO2_prophet.withColumnRenamed("avg_c_NO2", "y").withColumnRenamed("original_date_time", "ds")

data_NO2_prophet = data_NO2_prophet_u.withColumn("ds", to_timestamp("ds", "yyyy-MM-dd HH:mm:ss")) \
                                       .withColumn("ds", from_utc_timestamp("ds", "Europe/Rome"))

#data_NO2_prophet.printSchema()

def train_and_forecast(group):
    
    print(f"Processing group: {group['NOME'].iloc[0]} with {len(group)} data points")
    
    group["ds"] = pd.to_datetime(group["ds"])

    m = Prophet(seasonality_mode="additive",
                    yearly_seasonality=True,
                    weekly_seasonality=True,
                    daily_seasonality=True,
                    interval_width=0.95)
    
    m.fit(group[group['ds'] <= cutoff_date])
    
    forecast = m.predict(group)
    
    path = '/afs/enea.it/por/user/nafis/PFS/por/Thesis/codes/prophet_lazio/NO2/'

    if not os.path.exists(path):
        os.makedirs(path)
    
    #try:
     #   fig1 = m.plot(forecast)
      #  fig1.suptitle(f"NO2 Concentration Forecast for {group['NOME'].iloc[0]}")
       # fig1.legend(['Actual', 'Forecasted'], loc='upper right')
        #fig1.subplots_adjust(top=0.9)
        
        #fig2 = m.plot_components(forecast)
        #fig2.suptitle(f"NO2 Concentration Component Forecast for {group['NOME'].iloc[0]}")
        #fig2.subplots_adjust(top=0.9)

        #fig1.savefig(f"/afs/enea.it/por/user/nafis/PFS/por/Thesis/codes/prophet/NO2/{group['NOME'].iloc[0]}_prophet_forecast.png")
        #fig2.savefig(f"/afs/enea.it/por/user/nafis/PFS/por/Thesis/codes/prophet/NO2/{group['NOME'].iloc[0]}_prophet_components.png")
    #except Exception as e:
     #   print(f"Error generating plots for group {group['NOME'].iloc[0]}: {str(e)}")
    
    
    f_pd = forecast[['ds', 'yhat', 'yhat_lower', 'yhat_upper','trend', 'yearly', 'weekly', 'daily']].set_index("ds")
 
    group_pd = group[['ds', 'NOME', 'y']].set_index("ds")
    
    result_pd = f_pd.join( group_pd, how = "left")
    result_pd.reset_index(level=0, inplace=True)
    
    result_pd['NOME'] = group['NOME'].iloc[0]
    
    return result_pd[['ds', 'NOME', 'y', 'yhat', 'yhat_upper', 'yhat_lower','trend', 'yearly', 'weekly', 'daily']] 

result_schema =StructType([
    StructField('ds',TimestampType()),
    StructField('NOME', StringType()),
    StructField('y', DoubleType()),
    StructField('yhat',FloatType()),
    StructField('yhat_upper',FloatType()),
    StructField('yhat_lower',FloatType()),
    StructField('trend',FloatType()),
    StructField('yearly',FloatType()),
    StructField('weekly',FloatType()),
    StructField('daily',FloatType())
  ])

# create train and test data based on cutoff date
cutoff_date = "2021-06-30 23:00:00"

data_NO2_prophet_train = data_NO2_prophet.filter(col('ds') <= cutoff_date)
data_NO2_prophet_test = data_NO2_prophet.filter((col('ds') > cutoff_date))

max_date = data_NO2_prophet_test.agg(max('ds')).collect()[0][0]

period = data_NO2_prophet_test.count()

# Start time
start_time = time()
# Train and forecast by ticker 
spark_forecast_NO2 = data_NO2_prophet.groupBy("NOME").applyInPandas(train_and_forecast, schema=result_schema)
# Processing time
print('The time used for the Spark forecast is ', time()-start_time)

#spark_forecast_NO2.show()




The time used for the Spark forecast is  0.03573036193847656


                                                                                

In [16]:
from pyspark.sql.functions import col, pow, sqrt

# Define cutoff dates
cutoff_dates = {
    "5 Days": "2021-07-05 23:00:00",
    "30 Days": "2021-07-30 23:00:00",
    "90 Days": "2021-09-28 23:00:00",
    "180 Days": "2021-12-27 23:00:00",
    "365 Days": "2022-06-29 23:00:00"
}

# Define a list to store the evaluation results
results = []

# Compute evaluation metrics for each time period and NOME
for period, cutoff in cutoff_dates.items():
    for nome in spark_forecast_NO2.select("NOME").distinct().rdd.flatMap(lambda x: x).collect():
        # Filter the forecast data based on the cutoff date and NOME
        forecast_data_NO2 = spark_forecast_NO2.filter((col("ds") <= cutoff) & (col("NOME") == nome))

        # Calculate MAE
        mae = forecast_data_NO2.select(abs(col("y") - col("yhat")).alias("error")) \
                             .agg({"error": "mean"}) \
                             .withColumnRenamed("avg(error)", "MAE") \
                             .collect()[0]["MAE"]

        # Calculate MSE
        mse = forecast_data_NO2.select(pow(col("y") - col("yhat"), 2).alias("error")) \
                             .agg({"error": "mean"}) \
                             .withColumnRenamed("avg(error)", "MSE") \
                             .collect()[0]["MSE"]

        # Calculate RMSE
        rmse = forecast_data_NO2.select(pow(col("y") - col("yhat"), 2).alias("error")) \
                              .agg({"error": "mean"}) \
                              .withColumnRenamed("avg(error)", "MSE") \
                              .withColumn("RMSE", sqrt(col("MSE")).alias("RMSE")) \
                              .collect()[0]["RMSE"]

        # Calculate coverage
        coverage = forecast_data_NO2.filter((col("y") >= col("yhat_lower")) & (col("y") <= col("yhat_upper"))) \
                                  .count() / forecast_data_NO2.count() * 100

        # Add the results to the list
        results.append((period, nome, mse, rmse, mae, coverage))

# Create a Spark DataFrame from the results list
result_df_NO2 = spark.createDataFrame(results, ["Time", "NOME", "MSE", "RMSE", "MAE", "Coverage"])

# Show the results
result_df_NO2.show(100)

Processing group: Rieti with 29862 data points                      (0 + 2) / 2]
Processing group: Frosinone with 29862 data points
07:33:34 - cmdstanpy - INFO - Chain [1] start processing
07:33:34 - cmdstanpy - INFO - Chain [1] start processing
07:33:46 - cmdstanpy - INFO - Chain [1] done processing
07:33:47 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Latina with 29862 data points
Processing group: Viterbo with 29862 data points
07:33:54 - cmdstanpy - INFO - Chain [1] start processing
07:33:55 - cmdstanpy - INFO - Chain [1] start processing
07:34:02 - cmdstanpy - INFO - Chain [1] done processing             (0 + 2) / 2]
07:34:06 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Roma with 29862 data points
07:34:10 - cmdstanpy - INFO - Chain [1] start processing
07:34:16 - cmdstanpy - INFO - Chain [1] done processing             (1 + 1) / 2]
Processing group: Rieti with 29862 data points
07:34:36 - cmdstanpy - INFO - Chain [1] start processing       

Processing group: Viterbo with 29862 data points
Processing group: Latina with 29862 data points
07:57:22 - cmdstanpy - INFO - Chain [1] start processing
07:57:23 - cmdstanpy - INFO - Chain [1] start processing
07:57:31 - cmdstanpy - INFO - Chain [1] done processing
07:57:35 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Roma with 29862 data points
07:57:39 - cmdstanpy - INFO - Chain [1] start processing
07:57:45 - cmdstanpy - INFO - Chain [1] done processing             (1 + 1) / 2]
Processing group: Frosinone with 29862 data points
07:58:25 - cmdstanpy - INFO - Chain [1] start processing            (0 + 2) / 2]
07:58:26 - cmdstanpy - INFO - Chain [1] start processing
07:58:34 - cmdstanpy - INFO - Chain [1] done processing
07:58:35 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Latina with 29862 data points
Processing group: Viterbo with 29862 data points
07:58:42 - cmdstanpy - INFO - Chain [1] start processing
07:58:43 - cmdstanpy - INFO - Chain [1

08:09:23 - cmdstanpy - INFO - Chain [1] done processing
08:09:27 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Roma with 29862 data points
08:09:31 - cmdstanpy - INFO - Chain [1] start processing
08:09:37 - cmdstanpy - INFO - Chain [1] done processing             (1 + 1) / 2]
Processing group: Frosinone with 29862 data points
08:10:16 - cmdstanpy - INFO - Chain [1] start processing            (0 + 2) / 2]
08:10:17 - cmdstanpy - INFO - Chain [1] start processing
08:10:26 - cmdstanpy - INFO - Chain [1] done processing
08:10:27 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Latina with 29862 data points
Processing group: Viterbo with 29862 data points
08:10:34 - cmdstanpy - INFO - Chain [1] start processing
08:10:34 - cmdstanpy - INFO - Chain [1] start processing
08:10:43 - cmdstanpy - INFO - Chain [1] done processing
08:10:47 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Roma with 29862 data points
08:10:51 - cmdstanpy - INFO - Chai

08:21:27 - cmdstanpy - INFO - Chain [1] done processing             (1 + 1) / 2]
Processing group: Frosinone with 29862 data points
08:22:07 - cmdstanpy - INFO - Chain [1] start processing            (0 + 2) / 2]
08:22:07 - cmdstanpy - INFO - Chain [1] start processing
08:22:19 - cmdstanpy - INFO - Chain [1] done processing
08:22:19 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Latina with 29862 data points
Processing group: Viterbo with 29862 data points
08:22:26 - cmdstanpy - INFO - Chain [1] start processing
08:22:27 - cmdstanpy - INFO - Chain [1] start processing
08:22:33 - cmdstanpy - INFO - Chain [1] done processing
08:22:38 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Roma with 29862 data points
08:22:42 - cmdstanpy - INFO - Chain [1] start processing
08:22:48 - cmdstanpy - INFO - Chain [1] done processing             (1 + 1) / 2]
Processing group: Frosinone with 29862 data points
08:23:28 - cmdstanpy - INFO - Chain [1] start processing    

08:34:03 - cmdstanpy - INFO - Chain [1] start processing            (0 + 2) / 2]
08:34:03 - cmdstanpy - INFO - Chain [1] start processing
08:34:13 - cmdstanpy - INFO - Chain [1] done processing
08:34:15 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Latina with 29862 data points
08:34:21 - cmdstanpy - INFO - Chain [1] start processing
Processing group: Viterbo with 29862 data points
08:34:23 - cmdstanpy - INFO - Chain [1] start processing
08:34:28 - cmdstanpy - INFO - Chain [1] done processing
08:34:34 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Roma with 29862 data points
08:34:36 - cmdstanpy - INFO - Chain [1] start processing
08:34:43 - cmdstanpy - INFO - Chain [1] done processing             (1 + 1) / 2]
Processing group: Frosinone with 29862 data points
08:35:24 - cmdstanpy - INFO - Chain [1] start processing            (0 + 2) / 2]
08:35:24 - cmdstanpy - INFO - Chain [1] start processing
08:35:36 - cmdstanpy - INFO - Chain [1] done processin

Processing group: Viterbo with 29862 data points
Processing group: Latina with 29862 data points
08:46:27 - cmdstanpy - INFO - Chain [1] start processing
08:46:27 - cmdstanpy - INFO - Chain [1] start processing
08:46:36 - cmdstanpy - INFO - Chain [1] done processing
08:46:38 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Roma with 29862 data points
08:46:44 - cmdstanpy - INFO - Chain [1] start processing
08:46:50 - cmdstanpy - INFO - Chain [1] done processing             (1 + 1) / 2]
Processing group: Frosinone with 29862 data points
08:47:33 - cmdstanpy - INFO - Chain [1] start processing            (0 + 2) / 2]
08:47:33 - cmdstanpy - INFO - Chain [1] start processing
08:47:43 - cmdstanpy - INFO - Chain [1] done processing
08:47:45 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Latina with 29862 data points
Processing group: Viterbo with 29862 data points
08:47:51 - cmdstanpy - INFO - Chain [1] start processing
08:47:52 - cmdstanpy - INFO - Chain [1

08:58:29 - cmdstanpy - INFO - Chain [1] done processing
08:58:31 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Roma with 29862 data points
08:58:36 - cmdstanpy - INFO - Chain [1] start processing
08:58:42 - cmdstanpy - INFO - Chain [1] done processing             (1 + 1) / 2]
Processing group: Frosinone with 29862 data points
08:59:24 - cmdstanpy - INFO - Chain [1] start processing            (0 + 2) / 2]
08:59:24 - cmdstanpy - INFO - Chain [1] start processing
08:59:33 - cmdstanpy - INFO - Chain [1] done processing
08:59:35 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Latina with 29862 data points
Processing group: Viterbo with 29862 data points
08:59:41 - cmdstanpy - INFO - Chain [1] start processing
08:59:43 - cmdstanpy - INFO - Chain [1] start processing
08:59:51 - cmdstanpy - INFO - Chain [1] done processing
08:59:56 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Roma with 29862 data points
08:59:59 - cmdstanpy - INFO - Chai

09:10:28 - cmdstanpy - INFO - Chain [1] done processing             (1 + 1) / 2]
Processing group: Frosinone with 29862 data points
09:11:07 - cmdstanpy - INFO - Chain [1] start processing            (0 + 2) / 2]
09:11:07 - cmdstanpy - INFO - Chain [1] start processing
09:11:19 - cmdstanpy - INFO - Chain [1] done processing
09:11:20 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Viterbo with 29862 data points
Processing group: Latina with 29862 data points
09:11:27 - cmdstanpy - INFO - Chain [1] start processing
09:11:27 - cmdstanpy - INFO - Chain [1] start processing
09:11:34 - cmdstanpy - INFO - Chain [1] done processing
09:11:39 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Roma with 29862 data points
09:11:43 - cmdstanpy - INFO - Chain [1] start processing
09:11:50 - cmdstanpy - INFO - Chain [1] done processing             (1 + 1) / 2]
Processing group: Frosinone with 29862 data points
09:12:30 - cmdstanpy - INFO - Chain [1] start processing    

09:23:04 - cmdstanpy - INFO - Chain [1] start processing            (0 + 2) / 2]
09:23:04 - cmdstanpy - INFO - Chain [1] start processing
09:23:14 - cmdstanpy - INFO - Chain [1] done processing
09:23:15 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Latina with 29862 data points
Processing group: Viterbo with 29862 data points
09:23:22 - cmdstanpy - INFO - Chain [1] start processing
09:23:23 - cmdstanpy - INFO - Chain [1] start processing
09:23:29 - cmdstanpy - INFO - Chain [1] done processing
09:23:33 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Roma with 29862 data points
09:23:37 - cmdstanpy - INFO - Chain [1] start processing
09:23:43 - cmdstanpy - INFO - Chain [1] done processing             (1 + 1) / 2]
Processing group: Frosinone with 29862 data points
09:24:20 - cmdstanpy - INFO - Chain [1] start processing            (0 + 2) / 2]
09:24:21 - cmdstanpy - INFO - Chain [1] start processing
09:24:32 - cmdstanpy - INFO - Chain [1] done processin

Processing group: Latina with 29862 data points
Processing group: Viterbo with 29862 data points
09:35:07 - cmdstanpy - INFO - Chain [1] start processing
09:35:08 - cmdstanpy - INFO - Chain [1] start processing
09:35:14 - cmdstanpy - INFO - Chain [1] done processing
09:35:20 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Roma with 29862 data points
09:35:22 - cmdstanpy - INFO - Chain [1] start processing
09:35:28 - cmdstanpy - INFO - Chain [1] done processing             (1 + 1) / 2]
Processing group: Frosinone with 29862 data points
09:36:07 - cmdstanpy - INFO - Chain [1] start processing            (0 + 2) / 2]
09:36:07 - cmdstanpy - INFO - Chain [1] start processing
09:36:19 - cmdstanpy - INFO - Chain [1] done processing
09:36:19 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Latina with 29862 data points
Processing group: Viterbo with 29862 data points
09:36:27 - cmdstanpy - INFO - Chain [1] start processing
09:36:27 - cmdstanpy - INFO - Chain [1

09:47:08 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Roma with 29862 data points
09:47:14 - cmdstanpy - INFO - Chain [1] done processing
09:47:16 - cmdstanpy - INFO - Chain [1] start processing
09:47:22 - cmdstanpy - INFO - Chain [1] done processing             (1 + 1) / 2]
Processing group: Frosinone with 29862 data points
09:48:03 - cmdstanpy - INFO - Chain [1] start processing            (0 + 2) / 2]
09:48:03 - cmdstanpy - INFO - Chain [1] start processing
09:48:15 - cmdstanpy - INFO - Chain [1] done processing
09:48:17 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Latina with 29862 data points
Processing group: Viterbo with 29862 data points
09:48:23 - cmdstanpy - INFO - Chain [1] start processing
09:48:24 - cmdstanpy - INFO - Chain [1] start processing
09:48:33 - cmdstanpy - INFO - Chain [1] done processing
09:48:38 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Roma with 29862 data points
09:48:41 - cmdstanpy - INFO - Chai

09:59:21 - cmdstanpy - INFO - Chain [1] done processing             (1 + 1) / 2]
Processing group: Frosinone with 29862 data points
10:00:02 - cmdstanpy - INFO - Chain [1] start processing            (0 + 2) / 2]
10:00:02 - cmdstanpy - INFO - Chain [1] start processing
10:00:13 - cmdstanpy - INFO - Chain [1] done processing
10:00:14 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Latina with 29862 data points
Processing group: Viterbo with 29862 data points
10:00:21 - cmdstanpy - INFO - Chain [1] start processing
10:00:22 - cmdstanpy - INFO - Chain [1] start processing
10:00:30 - cmdstanpy - INFO - Chain [1] done processing
10:00:34 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Roma with 29862 data points
10:00:38 - cmdstanpy - INFO - Chain [1] start processing
10:00:44 - cmdstanpy - INFO - Chain [1] done processing             (1 + 1) / 2]
Processing group: Frosinone with 29862 data points
10:01:24 - cmdstanpy - INFO - Chain [1] start processing    

10:12:06 - cmdstanpy - INFO - Chain [1] start processing            (0 + 2) / 2]
10:12:06 - cmdstanpy - INFO - Chain [1] start processing
10:12:19 - cmdstanpy - INFO - Chain [1] done processing
10:12:19 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Latina with 29862 data points
Processing group: Viterbo with 29862 data points
10:12:27 - cmdstanpy - INFO - Chain [1] start processing
10:12:27 - cmdstanpy - INFO - Chain [1] start processing
10:12:36 - cmdstanpy - INFO - Chain [1] done processing
10:12:40 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Roma with 29862 data points
10:12:44 - cmdstanpy - INFO - Chain [1] start processing
10:12:50 - cmdstanpy - INFO - Chain [1] done processing             (1 + 1) / 2]
Processing group: Frosinone with 29862 data points
10:13:27 - cmdstanpy - INFO - Chain [1] start processing            (0 + 2) / 2]
10:13:27 - cmdstanpy - INFO - Chain [1] start processing
10:13:35 - cmdstanpy - INFO - Chain [1] done processin

Processing group: Latina with 29862 data points
Processing group: Viterbo with 29862 data points
10:24:22 - cmdstanpy - INFO - Chain [1] start processing
10:24:23 - cmdstanpy - INFO - Chain [1] start processing
10:24:30 - cmdstanpy - INFO - Chain [1] done processing
10:24:34 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Roma with 29862 data points
10:24:38 - cmdstanpy - INFO - Chain [1] start processing
10:24:44 - cmdstanpy - INFO - Chain [1] done processing             (1 + 1) / 2]
                                                                                

+--------+---------+------------------+------------------+------------------+-----------------+
|    Time|     NOME|               MSE|              RMSE|               MAE|         Coverage|
+--------+---------+------------------+------------------+------------------+-----------------+
|  5 Days|  Viterbo| 8.875738924614224| 2.979217837724228| 2.048251154129947|94.93003731343283|
|  5 Days|    Rieti| 5.181367839374911| 2.276261812572295|1.5609797247766006| 95.1445895522388|
|  5 Days|   Latina|17.217351256519503| 4.149379623090602|2.8951821738924104|94.84141791044776|
|  5 Days|Frosinone|15.504303402042716|3.9375504316824586| 2.966626252226218|94.15578358208954|
|  5 Days|     Roma| 57.39852844343018| 7.576181653275625| 5.791052880110181|94.02985074626866|
| 30 Days|  Viterbo|  8.72059222818815|2.9530648872295626|2.0329940489450093|95.04990925589837|
| 30 Days|    Rieti| 5.100556286208062|   2.2584411186055|1.5495163157021552|95.28584392014518|
| 30 Days|   Latina| 17.02210312204571| 

In [None]:
result_df_NO2_pd = result_df_NO2.pandas_api()
result_df_NO2_pd

O3 

In [17]:
from pyspark.sql.functions import *
from time import time
import matplotlib.pyplot as plt
import os

data_O3_prophet = data_O3.groupBy("NOME", "original_date_time").agg(avg("c_O3").alias("avg_c_O3"))
data_O3_prophet.show()

data_O3_prophet_u = data_O3_prophet.withColumnRenamed("avg_c_O3", "y").withColumnRenamed("original_date_time", "ds")

data_O3_prophet = data_O3_prophet_u.withColumn("ds", to_timestamp("ds", "yyyy-MM-dd HH:mm:ss")) \
                                       .withColumn("ds", from_utc_timestamp("ds", "Europe/Rome"))

data_O3_prophet.printSchema()

def train_and_forecast(group):
    
    print(f"Processing group: {group['NOME'].iloc[0]} with {len(group)} data points")
    
    group["ds"] = pd.to_datetime(group["ds"])

    m = Prophet(seasonality_mode="additive",
                    yearly_seasonality=True,
                    weekly_seasonality=True,
                    daily_seasonality=True,
                    interval_width=0.95)
    
    m.fit(group[group['ds'] <= cutoff_date])
    
    forecast = m.predict(group)
    
    path = '/afs/enea.it/por/user/nafis/PFS/por/Thesis/codes/prophet_lazio/O3/'

    if not os.path.exists(path):
        os.makedirs(path)
    
    try:
        fig1 = m.plot(forecast)
        fig1.suptitle(f"O3 Concentration Forecast for {group['NOME'].iloc[0]}")
        fig1.legend(['Actual', 'Forecasted'], loc='upper right')
        fig1.subplots_adjust(top=0.9)
        
        fig2 = m.plot_components(forecast)
        fig2.suptitle(f"O3 Concentration Component Forecast for {group['NOME'].iloc[0]}")
        fig2.subplots_adjust(top=0.9)

        fig1.savefig(f"/afs/enea.it/por/user/nafis/PFS/por/Thesis/codes/prophet_lazio/O3/{group['NOME'].iloc[0]}_prophet_forecast.png")
        fig2.savefig(f"/afs/enea.it/por/user/nafis/PFS/por/Thesis/codes/prophet_lazio/O3/{group['NOME'].iloc[0]}_prophet_components.png")
    except Exception as e:
        print(f"Error generating plots for group {group['NOME'].iloc[0]}: {str(e)}")
    
    
    f_pd = forecast[['ds', 'yhat', 'yhat_lower', 'yhat_upper','trend', 'yearly', 'weekly', 'daily']].set_index("ds")
 
    group_pd = group[['ds', 'NOME', 'y']].set_index("ds")
    
    result_pd = f_pd.join( group_pd, how = "left")
    result_pd.reset_index(level=0, inplace=True)
    
    result_pd['NOME'] = group['NOME'].iloc[0]
    
    return result_pd[['ds', 'NOME', 'y', 'yhat', 'yhat_upper', 'yhat_lower','trend', 'yearly', 'weekly', 'daily']] 

result_schema =StructType([
    StructField('ds',TimestampType()),
    StructField('NOME', StringType()),
    StructField('y', DoubleType()),
    StructField('yhat',FloatType()),
    StructField('yhat_upper',FloatType()),
    StructField('yhat_lower',FloatType()),
    StructField('trend',FloatType()),
    StructField('yearly',FloatType()),
    StructField('weekly',FloatType()),
    StructField('daily',FloatType())
  ])


# create train and test data based on cutoff date
cutoff_date = "2021-06-30 23:00:00"

data_O3_prophet_train = data_O3_prophet.filter(col('ds') <= cutoff_date)
data_O3_prophet_test = data_O3_prophet.filter((col('ds') > cutoff_date))

max_date = data_O3_prophet_test.agg(max('ds')).collect()[0][0]

period = data_O3_prophet_test.count()

# Start time
start_time = time()
# Train and forecast by ticker 
spark_forecast_O3 = data_O3_prophet.groupBy("NOME").applyInPandas(train_and_forecast, schema=result_schema)
# Processing time
print('The time used for the Spark forecast is ', time()-start_time)

spark_forecast_O3.show()


                                                                                

+-------+-------------------+------------------+
|   NOME| original_date_time|          avg_c_O3|
+-------+-------------------+------------------+
|Viterbo|2022-02-02 01:00:00| 63.41638380530973|
|  Rieti|2021-11-01 05:00:00|  64.2590848034682|
|   Roma|2021-12-12 02:00:00|56.866665243323446|
|  Rieti|2021-12-19 15:00:00| 57.12363293063586|
|  Rieti|2021-05-20 22:00:00|110.31992742774568|
|  Rieti|2021-10-08 15:00:00| 67.82324296531792|
|   Roma|2021-05-22 11:00:00| 105.9227925074184|
| Latina|2021-06-10 21:00:00|119.86570038571429|
| Latina|2021-12-13 00:00:00| 56.38928176785714|
|   Roma|2021-10-11 14:00:00| 64.54337313649849|
|Viterbo|2021-06-27 07:00:00| 78.74778460176996|
|   Roma|2021-10-11 19:00:00|58.406855623145425|
|Viterbo|2021-10-08 00:00:00| 58.10656114380535|
|  Rieti|2021-10-08 07:00:00| 54.55861143930636|
|Viterbo|2021-12-07 13:00:00| 56.05801870796459|
| Latina|2021-05-21 09:00:00|105.94103004285711|
|Viterbo|2021-11-29 20:00:00| 56.90135650884957|
|  Rieti|2021-03-09 

                                                                                

The time used for the Spark forecast is  0.0502011775970459


Processing group: Rieti with 29862 data points                      (0 + 1) / 1]
14:49:54 - cmdstanpy - INFO - Chain [1] start processing
14:50:15 - cmdstanpy - INFO - Chain [1] done processing


+-------------------+-----+------------------+---------+----------+----------+---------+----------+----------+-----------+
|                 ds| NOME|                 y|     yhat|yhat_upper|yhat_lower|    trend|    yearly|    weekly|      daily|
+-------------------+-----+------------------+---------+----------+----------+---------+----------+----------+-----------+
|2019-01-01 01:00:00|Rieti|37.189024745664746|39.143295|  57.49009| 20.793215| 70.55917|-22.094322|-2.2409546| -7.0805984|
|2019-01-01 02:00:00|Rieti|41.472894988439286|39.647747|  58.05195| 22.010616|70.553665|-22.103779|-2.2436514| -6.5584908|
|2019-01-01 03:00:00|Rieti| 48.15371236416185|40.249825| 58.108025| 22.330557| 70.54816|-22.113247|-2.2380228|  -5.947064|
|2019-01-01 04:00:00|Rieti|  51.5456828901734| 40.67378| 58.087936|  22.80754|70.542656|-22.122728|-2.2240274| -5.5221214|
|2019-01-01 05:00:00|Rieti| 52.66332410982661|40.856014| 58.827152|  21.43948| 70.53715|-22.132221|-2.2016857| -5.3472276|
|2019-01-01 06:0

Processing group: Viterbo with 29862 data points
                                                                                

In [18]:
from pyspark.sql.functions import col
from pyspark.sql.functions import pow, sqrt
from prophet.diagnostics import performance_metrics


# Calculate MAE
mae_O3 = spark_forecast_O3.select(col("NOME"), abs(col("y") - col("yhat")).alias("error")) \
                    .groupBy("NOME") \
                    .agg({"error": "mean"}) \
                    .withColumnRenamed("avg(error)", "MAE")


# Calculate MSE
mse_O3 = spark_forecast_O3.select(col("NOME"), pow(col("y") - col("yhat"), 2).alias("error")) \
                    .groupBy("NOME") \
                    .agg({"error": "mean"}) \
                    .withColumnRenamed("avg(error)", "MSE")

# Calculate RMSE
rmse_O3 = spark_forecast_O3.select(col("NOME"), pow(col("y") - col("yhat"), 2).alias("error")) \
                     .groupBy("NOME") \
                     .agg({"error": "mean"}) \
                     .withColumnRenamed("avg(error)", "MSE") \
                     .withColumn("RMSE", sqrt(col("MSE")).alias("RMSE"))

# Show the results
mae_O3.show()
mse_O3.show()
rmse_O3.show()

14:50:27 - cmdstanpy - INFO - Chain [1] start processing         (0 + 96) / 119]
14:50:44 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Frosinone with 29862 data points                  (0 + 2) / 2]
Processing group: Rieti with 29862 data points
14:51:08 - cmdstanpy - INFO - Chain [1] start processing
14:51:08 - cmdstanpy - INFO - Chain [1] start processing
14:51:28 - cmdstanpy - INFO - Chain [1] done processing
14:51:32 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Latina with 29862 data points
14:51:38 - cmdstanpy - INFO - Chain [1] start processing
Processing group: Viterbo with 29862 data points
14:51:42 - cmdstanpy - INFO - Chain [1] start processing
14:51:50 - cmdstanpy - INFO - Chain [1] done processing
14:51:57 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Roma with 29862 data points
14:52:00 - cmdstanpy - INFO - Chain [1] start processing
14:52:14 - cmdstanpy - INFO - Chain [1] done processing             (1 + 1) / 2]
  

+---------+------------------+
|     NOME|               MAE|
+---------+------------------+
|  Viterbo| 9.259814227136745|
|    Rieti|  8.39984605501204|
|   Latina| 9.545006004330402|
|Frosinone| 9.452385800198158|
|     Roma|11.060428338526552|
+---------+------------------+



Processing group: Rieti with 29862 data points
14:53:04 - cmdstanpy - INFO - Chain [1] start processing            (0 + 2) / 2]
14:53:04 - cmdstanpy - INFO - Chain [1] start processing
14:53:26 - cmdstanpy - INFO - Chain [1] done processing
14:53:27 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Latina with 29862 data points
Processing group: Viterbo with 29862 data points
14:53:36 - cmdstanpy - INFO - Chain [1] start processing
14:53:37 - cmdstanpy - INFO - Chain [1] start processing
14:53:50 - cmdstanpy - INFO - Chain [1] done processing
14:53:53 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Roma with 29862 data points
14:53:59 - cmdstanpy - INFO - Chain [1] start processing
14:54:13 - cmdstanpy - INFO - Chain [1] done processing             (1 + 1) / 2]
                                                                                

+---------+------------------+
|     NOME|               MSE|
+---------+------------------+
|  Viterbo| 142.4690505576954|
|    Rieti|118.42187160637249|
|   Latina| 148.2267387124007|
|Frosinone|150.69700327824347|
|     Roma|198.17619701425835|
+---------+------------------+



Processing group: Frosinone with 29862 data points
14:55:01 - cmdstanpy - INFO - Chain [1] start processing            (0 + 2) / 2]
14:55:01 - cmdstanpy - INFO - Chain [1] start processing
14:55:22 - cmdstanpy - INFO - Chain [1] done processing
14:55:26 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Latina with 29862 data points
14:55:32 - cmdstanpy - INFO - Chain [1] start processing
Processing group: Viterbo with 29862 data points
14:55:37 - cmdstanpy - INFO - Chain [1] start processing
14:55:45 - cmdstanpy - INFO - Chain [1] done processing
14:55:53 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Roma with 29862 data points
14:55:56 - cmdstanpy - INFO - Chain [1] start processing
14:56:08 - cmdstanpy - INFO - Chain [1] done processing             (1 + 1) / 2]


+---------+------------------+------------------+
|     NOME|               MSE|              RMSE|
+---------+------------------+------------------+
|  Viterbo| 142.4690505576954|11.936039986431656|
|    Rieti|118.42187160637249|10.882181380880052|
|   Latina| 148.2267387124007|12.174840397820445|
|Frosinone|150.69700327824347|12.275870774745206|
|     Roma|198.17619701425835|14.077506775500353|
+---------+------------------+------------------+



                                                                                

In [17]:
from pyspark.sql.functions import *
from time import time
import matplotlib.pyplot as plt
import os

data_O3_prophet = data_O3.groupBy("NOME", "original_date_time").agg(avg("c_O3").alias("avg_c_O3"))
#data_O3_prophet.show()

data_O3_prophet_u = data_O3_prophet.withColumnRenamed("avg_c_O3", "y").withColumnRenamed("original_date_time", "ds")

data_O3_prophet = data_O3_prophet_u.withColumn("ds", to_timestamp("ds", "yyyy-MM-dd HH:mm:ss")) \
                                       .withColumn("ds", from_utc_timestamp("ds", "Europe/Rome"))

#data_O3_prophet.printSchema()

def train_and_forecast(group):
    
    print(f"Processing group: {group['NOME'].iloc[0]} with {len(group)} data points")
    
    group["ds"] = pd.to_datetime(group["ds"])

    m = Prophet(seasonality_mode="additive",
                    yearly_seasonality=True,
                    weekly_seasonality=True,
                    daily_seasonality=True,
                    interval_width=0.95)
    
    m.fit(group[group['ds'] <= cutoff_date])
    
    forecast = m.predict(group)
    
    path = '/afs/enea.it/por/user/nafis/PFS/por/Thesis/codes/prophet_lazio/O3/'

    if not os.path.exists(path):
        os.makedirs(path)
    
   # try:
    #    fig1 = m.plot(forecast)
     #   fig1.suptitle(f"O3 Concentration Forecast for {group['NOME'].iloc[0]}")
     #   fig1.legend(['Actual', 'Forecasted'], loc='upper right')
      #  fig1.subplots_adjust(top=0.9)
        
       # fig2 = m.plot_components(forecast)
       # fig2.suptitle(f"O3 Concentration Component Forecast for {group['NOME'].iloc[0]}")
        #fig2.subplots_adjust(top=0.9)

       # fig1.savefig(f"/afs/enea.it/por/user/nafis/PFS/por/Thesis/codes/prophet/O3/{group['NOME'].iloc[0]}_prophet_forecast.png")
        #fig2.savefig(f"/afs/enea.it/por/user/nafis/PFS/por/Thesis/codes/prophet/O3/{group['NOME'].iloc[0]}_prophet_components.png")
    #except Exception as e:
     #   print(f"Error generating plots for group {group['NOME'].iloc[0]}: {str(e)}")
    
    
    f_pd = forecast[['ds', 'yhat', 'yhat_lower', 'yhat_upper','trend', 'yearly', 'weekly', 'daily']].set_index("ds")
 
    group_pd = group[['ds', 'NOME', 'y']].set_index("ds")
    
    result_pd = f_pd.join( group_pd, how = "left")
    result_pd.reset_index(level=0, inplace=True)
    
    result_pd['NOME'] = group['NOME'].iloc[0]
    
    return result_pd[['ds', 'NOME', 'y', 'yhat', 'yhat_upper', 'yhat_lower','trend', 'yearly', 'weekly', 'daily']] 

result_schema =StructType([
    StructField('ds',TimestampType()),
    StructField('NOME', StringType()),
    StructField('y', DoubleType()),
    StructField('yhat',FloatType()),
    StructField('yhat_upper',FloatType()),
    StructField('yhat_lower',FloatType()),
    StructField('trend',FloatType()),
    StructField('yearly',FloatType()),
    StructField('weekly',FloatType()),
    StructField('daily',FloatType())
  ])


# create train and test data based on cutoff date
cutoff_date = "2021-06-30 23:00:00"

data_O3_prophet_train = data_O3_prophet.filter(col('ds') <= cutoff_date)
data_O3_prophet_test = data_O3_prophet.filter((col('ds') > cutoff_date))

max_date = data_O3_prophet_test.agg(max('ds')).collect()[0][0]

period = data_O3_prophet_test.count()

# Start time
start_time = time()
# Train and forecast by ticker 
spark_forecast_O3 = data_O3_prophet.groupBy("NOME").applyInPandas(train_and_forecast, schema=result_schema)
# Processing time
print('The time used for the Spark forecast is ', time()-start_time)

#spark_forecast_O3.show()




The time used for the Spark forecast is  0.043535470962524414


                                                                                

In [18]:
from pyspark.sql.functions import col, pow, sqrt

# Define cutoff dates
cutoff_dates = {
    "5 Days": "2021-07-05 23:00:00",
    "30 Days": "2021-07-30 23:00:00",
    "90 Days": "2021-09-28 23:00:00",
    "180 Days": "2021-12-27 23:00:00",
    "365 Days": "2022-06-29 23:00:00"
}

# Define a list to store the evaluation results
results = []

# Compute evaluation metrics for each time period and NOME
for period, cutoff in cutoff_dates.items():
    for nome in spark_forecast_O3.select("NOME").distinct().rdd.flatMap(lambda x: x).collect():
        # Filter the forecast data based on the cutoff date and NOME
        forecast_data_O3 = spark_forecast_O3.filter((col("ds") <= cutoff) & (col("NOME") == nome))

        # Calculate MAE
        mae = forecast_data_O3.select(abs(col("y") - col("yhat")).alias("error")) \
                             .agg({"error": "mean"}) \
                             .withColumnRenamed("avg(error)", "MAE") \
                             .collect()[0]["MAE"]

        # Calculate MSE
        mse = forecast_data_O3.select(pow(col("y") - col("yhat"), 2).alias("error")) \
                             .agg({"error": "mean"}) \
                             .withColumnRenamed("avg(error)", "MSE") \
                             .collect()[0]["MSE"]

        # Calculate RMSE
        rmse = forecast_data_O3.select(pow(col("y") - col("yhat"), 2).alias("error")) \
                              .agg({"error": "mean"}) \
                              .withColumnRenamed("avg(error)", "MSE") \
                              .withColumn("RMSE", sqrt(col("MSE")).alias("RMSE")) \
                              .collect()[0]["RMSE"]

        # Calculate coverage
        coverage = forecast_data_O3.filter((col("y") >= col("yhat_lower")) & (col("y") <= col("yhat_upper"))) \
                                  .count() / forecast_data_O3.count() * 100

        # Add the results to the list
        results.append((period, nome, mse, rmse, mae, coverage))

# Create a Spark DataFrame from the results list
result_df_O3 = spark.createDataFrame(results, ["Time", "NOME", "MSE", "RMSE", "MAE", "Coverage"])

# Show the results
result_df_O3.show()


Processing group: Frosinone with 29862 data points                  (0 + 2) / 2]
Processing group: Rieti with 29862 data points
10:26:50 - cmdstanpy - INFO - Chain [1] start processing
10:26:50 - cmdstanpy - INFO - Chain [1] start processing
10:27:09 - cmdstanpy - INFO - Chain [1] done processing
10:27:12 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Latina with 29862 data points
10:27:17 - cmdstanpy - INFO - Chain [1] start processing
Processing group: Viterbo with 29862 data points
10:27:20 - cmdstanpy - INFO - Chain [1] start processing
10:27:27 - cmdstanpy - INFO - Chain [1] done processing             (0 + 2) / 2]
10:27:32 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Roma with 29862 data points
10:27:35 - cmdstanpy - INFO - Chain [1] start processing
10:27:46 - cmdstanpy - INFO - Chain [1] done processing             (1 + 1) / 2]
Processing group: Frosinone with 29862 data points
10:28:22 - cmdstanpy - INFO - Chain [1] start processing       

10:40:44 - cmdstanpy - INFO - Chain [1] done processing
10:40:47 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Latina with 29862 data points
10:40:51 - cmdstanpy - INFO - Chain [1] start processing
Processing group: Viterbo with 29862 data points
10:40:55 - cmdstanpy - INFO - Chain [1] start processing
10:41:01 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Roma with 29862 data points
10:41:09 - cmdstanpy - INFO - Chain [1] done processing
10:41:09 - cmdstanpy - INFO - Chain [1] start processing
10:41:22 - cmdstanpy - INFO - Chain [1] done processing             (1 + 1) / 2]
Processing group: Frosinone with 29862 data points
10:41:58 - cmdstanpy - INFO - Chain [1] start processing            (0 + 2) / 2]
10:41:58 - cmdstanpy - INFO - Chain [1] start processing
10:42:12 - cmdstanpy - INFO - Chain [1] done processing
10:42:15 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Latina with 29862 data points
10:42:20 - cmdstanpy - INFO - Ch

Processing group: Viterbo with 29862 data points
10:55:01 - cmdstanpy - INFO - Chain [1] start processing
10:55:11 - cmdstanpy - INFO - Chain [1] done processing
10:55:15 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Roma with 29862 data points
10:55:19 - cmdstanpy - INFO - Chain [1] start processing
10:55:30 - cmdstanpy - INFO - Chain [1] done processing             (1 + 1) / 2]
Processing group: Frosinone with 29862 data points
10:56:09 - cmdstanpy - INFO - Chain [1] start processing            (0 + 2) / 2]
10:56:09 - cmdstanpy - INFO - Chain [1] start processing
10:56:22 - cmdstanpy - INFO - Chain [1] done processing
10:56:26 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Latina with 29862 data points
10:56:30 - cmdstanpy - INFO - Chain [1] start processing
Processing group: Viterbo with 29862 data points
10:56:34 - cmdstanpy - INFO - Chain [1] start processing
10:56:42 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Roma with 29

Processing group: Roma with 29862 data points
11:09:17 - cmdstanpy - INFO - Chain [1] start processing
11:09:28 - cmdstanpy - INFO - Chain [1] done processing             (1 + 1) / 2]
Processing group: Rieti with 29862 data points
11:10:04 - cmdstanpy - INFO - Chain [1] start processing            (0 + 2) / 2]
11:10:04 - cmdstanpy - INFO - Chain [1] start processing
11:10:23 - cmdstanpy - INFO - Chain [1] done processing
11:10:26 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Latina with 29862 data points
11:10:31 - cmdstanpy - INFO - Chain [1] start processing
Processing group: Viterbo with 29862 data points
11:10:34 - cmdstanpy - INFO - Chain [1] start processing
11:10:41 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Roma with 29862 data points
11:10:47 - cmdstanpy - INFO - Chain [1] done processing
11:10:49 - cmdstanpy - INFO - Chain [1] start processing
11:11:01 - cmdstanpy - INFO - Chain [1] done processing             (1 + 1) / 2]
Processing g

11:38:01 - cmdstanpy - INFO - Chain [1] start processing            (0 + 2) / 2]
11:38:01 - cmdstanpy - INFO - Chain [1] start processing
11:38:17 - cmdstanpy - INFO - Chain [1] done processing
11:38:20 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Latina with 29862 data points
11:38:25 - cmdstanpy - INFO - Chain [1] start processing
Processing group: Viterbo with 29862 data points
11:38:28 - cmdstanpy - INFO - Chain [1] start processing
11:38:34 - cmdstanpy - INFO - Chain [1] done processing
11:38:39 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Roma with 29862 data points
11:38:42 - cmdstanpy - INFO - Chain [1] start processing
11:38:54 - cmdstanpy - INFO - Chain [1] done processing             (1 + 1) / 2]
Processing group: Frosinone with 29862 data points
11:39:30 - cmdstanpy - INFO - Chain [1] start processing            (0 + 2) / 2]
11:39:30 - cmdstanpy - INFO - Chain [1] start processing
11:39:45 - cmdstanpy - INFO - Chain [1] done processin

Processing group: Latina with 29862 data points
11:52:10 - cmdstanpy - INFO - Chain [1] start processing
Processing group: Viterbo with 29862 data points
11:52:12 - cmdstanpy - INFO - Chain [1] start processing
11:52:22 - cmdstanpy - INFO - Chain [1] done processing
11:52:26 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Roma with 29862 data points
11:52:30 - cmdstanpy - INFO - Chain [1] start processing
11:52:41 - cmdstanpy - INFO - Chain [1] done processing             (1 + 1) / 2]
Processing group: Frosinone with 29862 data points
11:53:16 - cmdstanpy - INFO - Chain [1] start processing            (0 + 2) / 2]
11:53:16 - cmdstanpy - INFO - Chain [1] start processing
11:53:32 - cmdstanpy - INFO - Chain [1] done processing
11:53:34 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Latina with 29862 data points
11:53:40 - cmdstanpy - INFO - Chain [1] start processing
Processing group: Viterbo with 29862 data points
11:53:42 - cmdstanpy - INFO - Chain [1

12:06:08 - cmdstanpy - INFO - Chain [1] done processing
12:06:13 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Roma with 29862 data points
12:06:16 - cmdstanpy - INFO - Chain [1] start processing
12:06:26 - cmdstanpy - INFO - Chain [1] done processing             (1 + 1) / 2]
Processing group: Frosinone with 29862 data points
12:07:02 - cmdstanpy - INFO - Chain [1] start processing            (0 + 2) / 2]
12:07:02 - cmdstanpy - INFO - Chain [1] start processing
12:07:20 - cmdstanpy - INFO - Chain [1] done processing
12:07:25 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Latina with 29862 data points
12:07:28 - cmdstanpy - INFO - Chain [1] start processing
Processing group: Viterbo with 29862 data points
12:07:33 - cmdstanpy - INFO - Chain [1] start processing
12:07:39 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Roma with 29862 data points
12:07:45 - cmdstanpy - INFO - Chain [1] done processing
12:07:47 - cmdstanpy - INFO - Chai

12:20:27 - cmdstanpy - INFO - Chain [1] done processing             (1 + 1) / 2]
Processing group: Frosinone with 29862 data points
12:21:02 - cmdstanpy - INFO - Chain [1] start processing            (0 + 2) / 2]
12:21:02 - cmdstanpy - INFO - Chain [1] start processing
12:21:19 - cmdstanpy - INFO - Chain [1] done processing
12:21:23 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Latina with 29862 data points
12:21:27 - cmdstanpy - INFO - Chain [1] start processing
Processing group: Viterbo with 29862 data points
12:21:31 - cmdstanpy - INFO - Chain [1] start processing
12:21:38 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Roma with 29862 data points
12:21:46 - cmdstanpy - INFO - Chain [1] done processing
12:21:47 - cmdstanpy - INFO - Chain [1] start processing
12:21:58 - cmdstanpy - INFO - Chain [1] done processing             (1 + 1) / 2]
Processing group: Frosinone with 29862 data points
12:22:35 - cmdstanpy - INFO - Chain [1] start processing    

12:35:04 - cmdstanpy - INFO - Chain [1] start processing            (0 + 2) / 2]
12:35:04 - cmdstanpy - INFO - Chain [1] start processing
12:35:25 - cmdstanpy - INFO - Chain [1] done processing
12:35:26 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Latina with 29862 data points
12:35:33 - cmdstanpy - INFO - Chain [1] start processing
Processing group: Viterbo with 29862 data points
12:35:34 - cmdstanpy - INFO - Chain [1] start processing
12:35:45 - cmdstanpy - INFO - Chain [1] done processing
12:35:48 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Roma with 29862 data points
12:35:53 - cmdstanpy - INFO - Chain [1] start processing
12:36:04 - cmdstanpy - INFO - Chain [1] done processing             (1 + 1) / 2]
Processing group: Frosinone with 29862 data points
12:36:40 - cmdstanpy - INFO - Chain [1] start processing            (0 + 2) / 2]
12:36:41 - cmdstanpy - INFO - Chain [1] start processing
12:36:59 - cmdstanpy - INFO - Chain [1] done processin

Processing group: Latina with 29862 data points
12:49:42 - cmdstanpy - INFO - Chain [1] start processing
Processing group: Viterbo with 29862 data points
12:49:45 - cmdstanpy - INFO - Chain [1] start processing
12:49:52 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Roma with 29862 data points
12:49:58 - cmdstanpy - INFO - Chain [1] done processing
12:50:00 - cmdstanpy - INFO - Chain [1] start processing
12:50:11 - cmdstanpy - INFO - Chain [1] done processing             (1 + 1) / 2]
Processing group: Frosinone with 29862 data points
12:50:48 - cmdstanpy - INFO - Chain [1] start processing            (0 + 2) / 2]
12:50:48 - cmdstanpy - INFO - Chain [1] start processing
12:51:06 - cmdstanpy - INFO - Chain [1] done processing
12:51:10 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Latina with 29862 data points
12:51:15 - cmdstanpy - INFO - Chain [1] start processing
Processing group: Viterbo with 29862 data points
12:51:18 - cmdstanpy - INFO - Chain [1

13:04:05 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Roma with 29862 data points
13:04:11 - cmdstanpy - INFO - Chain [1] done processing
13:04:12 - cmdstanpy - INFO - Chain [1] start processing
13:04:23 - cmdstanpy - INFO - Chain [1] done processing             (1 + 1) / 2]
Processing group: Frosinone with 29862 data points
13:05:02 - cmdstanpy - INFO - Chain [1] start processing            (0 + 2) / 2]
13:05:02 - cmdstanpy - INFO - Chain [1] start processing
13:05:21 - cmdstanpy - INFO - Chain [1] done processing
13:05:24 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Latina with 29862 data points
13:05:29 - cmdstanpy - INFO - Chain [1] start processing
Processing group: Viterbo with 29862 data points
13:05:32 - cmdstanpy - INFO - Chain [1] start processing
13:05:39 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Roma with 29862 data points
13:05:47 - cmdstanpy - INFO - Chain [1] done processing
13:05:48 - cmdstanpy - INFO - Chai

13:18:12 - cmdstanpy - INFO - Chain [1] done processing             (1 + 1) / 2]
Processing group: Frosinone with 29862 data points
13:18:48 - cmdstanpy - INFO - Chain [1] start processing            (0 + 2) / 2]
13:18:48 - cmdstanpy - INFO - Chain [1] start processing
13:19:03 - cmdstanpy - INFO - Chain [1] done processing
13:19:07 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Latina with 29862 data points
13:19:11 - cmdstanpy - INFO - Chain [1] start processing
Processing group: Viterbo with 29862 data points
13:19:14 - cmdstanpy - INFO - Chain [1] start processing
13:19:22 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Roma with 29862 data points
13:19:29 - cmdstanpy - INFO - Chain [1] done processing
13:19:30 - cmdstanpy - INFO - Chain [1] start processing
13:19:41 - cmdstanpy - INFO - Chain [1] done processing             (1 + 1) / 2]
Processing group: Rieti with 29862 data points=>                   (7 + 4) / 11]
Processing group: Frosinone wi

13:32:38 - cmdstanpy - INFO - Chain [1] start processing            (0 + 2) / 2]
13:32:38 - cmdstanpy - INFO - Chain [1] start processing
13:32:56 - cmdstanpy - INFO - Chain [1] done processing
13:32:59 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Latina with 29862 data points
13:33:04 - cmdstanpy - INFO - Chain [1] start processing
Processing group: Viterbo with 29862 data points
13:33:07 - cmdstanpy - INFO - Chain [1] start processing
13:33:16 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Roma with 29862 data points
13:33:22 - cmdstanpy - INFO - Chain [1] done processing
13:33:24 - cmdstanpy - INFO - Chain [1] start processing
13:33:35 - cmdstanpy - INFO - Chain [1] done processing             (1 + 1) / 2]
Processing group: Frosinone with 29862 data points
13:34:13 - cmdstanpy - INFO - Chain [1] start processing            (0 + 2) / 2]
13:34:13 - cmdstanpy - INFO - Chain [1] start processing
13:34:31 - cmdstanpy - INFO - Chain [1] done processin

13:47:00 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Latina with 29862 data points
13:47:06 - cmdstanpy - INFO - Chain [1] start processing
Processing group: Viterbo with 29862 data points
13:47:08 - cmdstanpy - INFO - Chain [1] start processing
13:47:17 - cmdstanpy - INFO - Chain [1] done processing
Processing group: Roma with 29862 data points
13:47:24 - cmdstanpy - INFO - Chain [1] done processing
13:47:25 - cmdstanpy - INFO - Chain [1] start processing
13:47:37 - cmdstanpy - INFO - Chain [1] done processing             (1 + 1) / 2]
                                                                                

+--------+---------+------------------+------------------+------------------+-----------------+
|    Time|     NOME|               MSE|              RMSE|               MAE|         Coverage|
+--------+---------+------------------+------------------+------------------+-----------------+
|  5 Days|  Viterbo| 89.89646233747999| 9.481374496215198| 7.482670470480355|94.49626865671642|
|  5 Days|    Rieti| 83.45036222405513| 9.135116979221182| 7.090191673670943| 94.5195895522388|
|  5 Days|   Latina|117.87761470859238|10.857145790150945| 8.534576418347756|94.68283582089553|
|  5 Days|Frosinone| 99.38728900784695| 9.969317379231486| 7.790755661968563|94.66417910447761|
|  5 Days|     Roma| 145.5025510482712|12.062443825704275| 9.616992576412443|95.06996268656717|
| 30 Days|  Viterbo| 92.35015141139486| 9.609898616083047| 7.582695747373783|94.22867513611615|
| 30 Days|    Rieti| 86.53355662257624| 9.302341459147597|7.2149940514889455|94.03357531760436|
| 30 Days|   Latina|121.00364995113667|1

In [19]:
result_df_O3.show(100)

+--------+---------+------------------+------------------+------------------+-----------------+
|    Time|     NOME|               MSE|              RMSE|               MAE|         Coverage|
+--------+---------+------------------+------------------+------------------+-----------------+
|  5 Days|  Viterbo| 89.89646233747999| 9.481374496215198| 7.482670470480355|94.49626865671642|
|  5 Days|    Rieti| 83.45036222405513| 9.135116979221182| 7.090191673670943| 94.5195895522388|
|  5 Days|   Latina|117.87761470859238|10.857145790150945| 8.534576418347756|94.68283582089553|
|  5 Days|Frosinone| 99.38728900784695| 9.969317379231486| 7.790755661968563|94.66417910447761|
|  5 Days|     Roma| 145.5025510482712|12.062443825704275| 9.616992576412443|95.06996268656717|
| 30 Days|  Viterbo| 92.35015141139486| 9.609898616083047| 7.582695747373783|94.22867513611615|
| 30 Days|    Rieti| 86.53355662257624| 9.302341459147597|7.2149940514889455|94.03357531760436|
| 30 Days|   Latina|121.00364995113667|1

In [14]:
spark.stop()