In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.3.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m4.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.3-py2.py3-none-any.whl size=317840625 sha256=4a7051fabf6c2f3771642cbe99870d1e75bdff4c338b2ab7f8ba1de288545140
  Stored in directory: /root/.cache/pip/wheels/1b/3a/92/28b93e2fbfdbb07509ca4d6f50c5e407f48dce4ddbda69a4ab
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.3


In [3]:
# Download the dataset
!wget https://archive.ics.uci.edu/ml/machine-learning-databases/00360/AirQualityUCI.zip
!unzip AirQualityUCI.zip


--2024-10-19 06:29:51--  https://archive.ics.uci.edu/ml/machine-learning-databases/00360/AirQualityUCI.zip
Resolving archive.ics.uci.edu (archive.ics.uci.edu)... 128.195.10.252
Connecting to archive.ics.uci.edu (archive.ics.uci.edu)|128.195.10.252|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: unspecified
Saving to: ‘AirQualityUCI.zip’

AirQualityUCI.zip       [   <=>              ]   1.47M  3.21MB/s    in 0.5s    

2024-10-19 06:29:52 (3.21 MB/s) - ‘AirQualityUCI.zip’ saved [1543989]

Archive:  AirQualityUCI.zip
  inflating: AirQualityUCI.csv       
  inflating: AirQualityUCI.xlsx      


In [6]:
from pyspark.sql import SparkSession
import pandas as pd
from pyspark.sql.window import Window
from pyspark.sql.functions import lag, col
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator
import numpy as np

In [7]:
# Initialize Spark session
spark = SparkSession.builder.appName("TimeSeriesAnalysis").getOrCreate()

# Load the dataset into Pandas first for quick inspection
df_pandas = pd.read_csv('AirQualityUCI.csv', sep=';', decimal=',', parse_dates=[['Date', 'Time']])

# Preview the dataset
df_pandas.head()

# Remove unnecessary columns and fix the data types
df_pandas = df_pandas[['Date_Time', 'CO(GT)', 'PT08.S1(CO)', 'NMHC(GT)', 'C6H6(GT)', 'PT08.S2(NMHC)', 'NOx(GT)', 'PT08.S3(NOx)']]
df_pandas = df_pandas.dropna()
# Replace negative values with NaN
df_pandas['CO(GT)'] = df_pandas['CO(GT)'].apply(lambda x: np.nan if x < 0 else x)

# You can then apply interpolation to fill missing values
df_pandas['CO(GT)'] = df_pandas['CO(GT)'].interpolate()



  df_pandas = pd.read_csv('AirQualityUCI.csv', sep=';', decimal=',', parse_dates=[['Date', 'Time']])
  df_pandas = pd.read_csv('AirQualityUCI.csv', sep=';', decimal=',', parse_dates=[['Date', 'Time']])


In [8]:
# Rename columns to replace dots with underscores
df_pandas.columns = [col.replace('.', '_') for col in df_pandas.columns]

# Convert to Spark DataFrame
data = spark.createDataFrame(df_pandas)

# Show the renamed columns
data.show(5)


+-------------------+------+-----------+--------+--------+-------------+-------+------------+
|          Date_Time|CO(GT)|PT08_S1(CO)|NMHC(GT)|C6H6(GT)|PT08_S2(NMHC)|NOx(GT)|PT08_S3(NOx)|
+-------------------+------+-----------+--------+--------+-------------+-------+------------+
|10/03/2004 18.00.00|   2.6|     1360.0|   150.0|    11.9|       1046.0|  166.0|      1056.0|
|10/03/2004 19.00.00|   2.0|     1292.0|   112.0|     9.4|        955.0|  103.0|      1174.0|
|10/03/2004 20.00.00|   2.2|     1402.0|    88.0|     9.0|        939.0|  131.0|      1140.0|
|10/03/2004 21.00.00|   2.2|     1376.0|    80.0|     9.2|        948.0|  172.0|      1092.0|
|10/03/2004 22.00.00|   1.6|     1272.0|    51.0|     6.5|        836.0|  131.0|      1205.0|
+-------------------+------+-----------+--------+--------+-------------+-------+------------+
only showing top 5 rows



In [9]:
# Create a window specification for lagging the time series data
window = Window.orderBy("Date_Time")

# Generate lag features using backticks around column names with special characters
data_with_lags = data.withColumn("lag_1", lag(col("`CO(GT)`"), 1).over(window)) \
                     .withColumn("lag_2", lag(col("`CO(GT)`"), 2).over(window)) \
                     .withColumn("lag_3", lag(col("`CO(GT)`"), 3).over(window)) \
                     .withColumn("lag_4", lag(col("`CO(GT)`"), 4).over(window)) \
                     .withColumn("lag_5", lag(col("`CO(GT)`"), 5).over(window))

# Drop rows with missing lag values
data_clean = data_with_lags.dropna()

# Show the cleaned data with lag features
data_clean.show(5)



+-------------------+------+-----------+--------+--------+-------------+-------+------------+-----+-----+-----+-----+-----------------+
|          Date_Time|CO(GT)|PT08_S1(CO)|NMHC(GT)|C6H6(GT)|PT08_S2(NMHC)|NOx(GT)|PT08_S3(NOx)|lag_1|lag_2|lag_3|lag_4|            lag_5|
+-------------------+------+-----------+--------+--------+-------------+-------+------------+-----+-----+-----+-----+-----------------+
|01/01/2005 05.00.00|   1.4|     1004.0|  -200.0|     4.8|        753.0|  181.0|       879.0|  1.9|  2.7|  2.5|  1.6|1.595876288659794|
|01/01/2005 06.00.00|   1.5|     1001.0|  -200.0|     5.3|        777.0|  171.0|       859.0|  1.4|  1.9|  2.7|  2.5|              1.6|
|01/01/2005 07.00.00|   1.4|      974.0|  -200.0|     4.5|        736.0|  168.0|       888.0|  1.5|  1.4|  1.9|  2.7|              2.5|
|01/01/2005 08.00.00|   1.1|      915.0|  -200.0|     3.0|        653.0|  169.0|       973.0|  1.4|  1.5|  1.4|  1.9|              2.7|
|01/01/2005 09.00.00|   1.0|      939.0|  -200.0

In [10]:
# Assemble the features for scaling
assembler = VectorAssembler(inputCols=["lag_1", "lag_2", "lag_3", "lag_4", "lag_5"], outputCol="features")

# Scale the features
scaler = MinMaxScaler(inputCol="features", outputCol="scaled_features")
training_data = assembler.transform(data_clean)
scaler_model = scaler.fit(training_data)
scaled_data = scaler_model.transform(training_data)


In [11]:
# Initialize the Decision Tree Regressor model
dt = DecisionTreeRegressor(featuresCol="scaled_features", labelCol="CO(GT)")

# Train the model
dt_model = dt.fit(scaled_data)

# Make predictions
predictions = dt_model.transform(scaled_data)

# Show predictions
predictions.select("Date_Time", "`CO(GT)`", "prediction").show(5)



+-------------------+------+------------------+
|          Date_Time|CO(GT)|        prediction|
+-------------------+------+------------------+
|01/01/2005 05.00.00|   1.4| 1.908670011325342|
|01/01/2005 06.00.00|   1.5|1.5199851487067881|
|01/01/2005 07.00.00|   1.4|1.5199851487067881|
|01/01/2005 08.00.00|   1.1|1.5199851487067881|
|01/01/2005 09.00.00|   1.0| 1.179554701734488|
+-------------------+------+------------------+
only showing top 5 rows



In [12]:
# Initialize the evaluator
evaluator = RegressionEvaluator(labelCol="CO(GT)", predictionCol="prediction", metricName="rmse")

# Compute RMSE (Root Mean Squared Error)
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE): {rmse}")


Root Mean Squared Error (RMSE): 0.7121596162653802
