# Step 1: Load the required libraries

In [73]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [74]:
from pyspark.sql import SparkSession
import pandas as pd
import numpy as np



# Step 2: Load the data

In [75]:
spark = SparkSession.builder.appName("AirQuality").getOrCreate()

city_day = spark.read.csv("city_day.csv", header=True, inferSchema=True)
city_hour = spark.read.csv("city_hour.csv", header=True, inferSchema=True)
station_day = spark.read.csv("station_day.csv", header=True, inferSchema=True)
station_hour = spark.read.csv("station_hour.csv", header=True, inferSchema=True)
stations = spark.read.csv("stations.csv", header=True, inferSchema=True)


# Step 3: Data Cleaning

In [76]:
# Load the data into DataFrames
city_day = spark.read.csv("city_day.csv", header=True, inferSchema=True)
city_hour = spark.read.csv("city_hour.csv", header=True, inferSchema=True)
station_day = spark.read.csv("station_day.csv", header=True, inferSchema=True)
station_hour = spark.read.csv("station_hour.csv", header=True, inferSchema=True)
stations = spark.read.csv("stations.csv", header=True, inferSchema=True)

# Drop the PM2.5 column from all DataFrames except stations
city_day = city_day.drop("PM2.5")
city_hour = city_hour.drop("PM2.5")
station_day = station_day.drop("PM2.5")
station_hour = station_hour.drop("PM2.5")

# Remove any missing or duplicate values
city_day = city_day.dropna().dropDuplicates()
city_hour = city_hour.dropna().dropDuplicates()
station_day = station_day.dropna().dropDuplicates()
station_hour = station_hour.dropna().dropDuplicates()
stations = stations.dropna().dropDuplicates()


# Step 4: Data Exploration

In [77]:
# Check the schema of the datasets
city_day.printSchema()
city_hour.printSchema()
station_day.printSchema()
station_hour.printSchema()
stations.printSchema()

# Show the first few rows of the datasets
city_day.show(5)
city_hour.show(5)
station_day.show(5)
station_hour.show(5)
stations.show(5)

# Get the summary statistics of the datasets
city_day.describe().show()
city_hour.describe().show()
station_day.describe().show()
station_hour.describe().show()


root
 |-- City: string (nullable = true)
 |-- Date: date (nullable = true)
 |-- PM10: double (nullable = true)
 |-- NO: double (nullable = true)
 |-- NO2: double (nullable = true)
 |-- NOx: double (nullable = true)
 |-- NH3: double (nullable = true)
 |-- CO: double (nullable = true)
 |-- SO2: double (nullable = true)
 |-- O3: double (nullable = true)
 |-- Benzene: double (nullable = true)
 |-- Toluene: double (nullable = true)
 |-- Xylene: double (nullable = true)
 |-- AQI: double (nullable = true)
 |-- AQI_Bucket: string (nullable = true)

root
 |-- City: string (nullable = true)
 |-- Datetime: timestamp (nullable = true)
 |-- PM10: double (nullable = true)
 |-- NO: double (nullable = true)
 |-- NO2: double (nullable = true)
 |-- NOx: double (nullable = true)
 |-- NH3: double (nullable = true)
 |-- CO: double (nullable = true)
 |-- SO2: double (nullable = true)
 |-- O3: double (nullable = true)
 |-- Benzene: double (nullable = true)
 |-- Toluene: double (nullable = true)
 |-- Xylene: 

# Step 5: Data Aggregation

In [78]:
# Compute average values by city and date
from pyspark.sql.functions import avg, sum

city_day_agg = city_day.groupBy("City", "Date") \
    .agg(avg("PM10").alias("avg_PM10"), 
         avg("NO2").alias("avg_NO2"), 
         avg("SO2").alias("avg_SO2"), 
         avg("CO").alias("avg_CO"), 
         avg("O3").alias("avg_O3"))
         
city_day_agg.show()


+-------------+----------+--------+-------+-------+------+------+
|         City|      Date|avg_PM10|avg_NO2|avg_SO2|avg_CO|avg_O3|
+-------------+----------+--------+-------+-------+------+------+
|     Amritsar|2019-05-13|  142.79|  12.11|  26.99|  1.03| 19.67|
|     Amritsar|2019-12-25|    76.1|  10.53|   6.27|  0.74| 48.65|
|        Delhi|2020-07-01|  128.66|  21.05|  11.15|  0.97| 29.73|
|     Gurugram|2020-06-01|   87.51|   20.5|   5.42|  0.85| 72.06|
|Visakhapatnam|2018-04-20|   69.43|  24.01|  16.12|  0.57| 11.16|
|      Kolkata|2019-07-30|   43.61|  23.09|   2.65|   0.5| 17.36|
|    Amaravati|2020-02-22|   54.16|  12.98|   20.2|  0.48| 39.74|
|    Hyderabad|2019-08-08|    31.7|   19.4|   5.75|  0.42| 14.47|
|    Hyderabad|2019-03-01|  129.12|   45.3|   9.83|   0.6| 26.37|
|    Hyderabad|2019-02-21|   89.56|  35.72|   4.09|  0.54| 27.44|
|   Chandigarh|2020-03-23|   38.24|  10.02|  11.08|  0.54| 15.03|
|        Delhi|2019-06-09|  265.12|  62.84|  21.74|  1.86| 69.14|
|    Hyder

# Step 6: Data Visualization

In [79]:
# Calculate daily and monthly averages for NO2 parameter
no2_daily = city_day.groupBy(['City', 'Date']).agg(avg('NO2').alias('avg_NO2'))
no2_monthly = no2_daily.agg(avg('avg_NO2').alias('avg_monthly_NO2'))

# Show the results
no2_daily.show()
no2_monthly.show()


+-------------+----------+-------+
|         City|      Date|avg_NO2|
+-------------+----------+-------+
|     Amritsar|2019-05-13|  12.11|
|     Amritsar|2019-12-25|  10.53|
|        Delhi|2020-07-01|  21.05|
|     Gurugram|2020-06-01|   20.5|
|Visakhapatnam|2018-04-20|  24.01|
|      Kolkata|2019-07-30|  23.09|
|    Amaravati|2020-02-22|  12.98|
|    Hyderabad|2019-08-08|   19.4|
|    Hyderabad|2019-03-01|   45.3|
|    Hyderabad|2019-02-21|  35.72|
|   Chandigarh|2020-03-23|  10.02|
|        Delhi|2019-06-09|  62.84|
|    Hyderabad|2020-04-18|  25.01|
|        Delhi|2020-01-19|  34.18|
|        Delhi|2015-04-11|  59.52|
|    Amaravati|2019-11-08|  20.13|
|        Delhi|2019-10-09|   45.5|
|   Chandigarh|2020-02-22|  16.27|
|Visakhapatnam|2016-08-13|  41.82|
|    Amaravati|2019-12-05|  22.64|
+-------------+----------+-------+
only showing top 20 rows

+------------------+
|   avg_monthly_NO2|
+------------------+
|31.478126385809436|
+------------------+



In [108]:
import plotly.express as px

# Convert PySpark dataframe to Pandas dataframe
no2_daily_pd = no2_daily.toPandas()
no2_monthly_pd = no2_monthly.toPandas()

# Plot daily average NO2
fig = px.line(no2_daily_pd, x='Date', y='avg_NO2', color='City', title='Daily Average NO2')
fig.show()




# Step 7: Data Analysis 

In [80]:
# Perform time-series analysis and forecasting
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression

# Prepare the data for modeling
vectorAssembler = VectorAssembler(inputCols=["PM10", "NO2", "SO2", "CO", "O3"], outputCol="features")
lr_df = station_hour.filter(station_hour.StationId == "DL001").select("Datetime", "PM10", "NO2", "SO2", "CO", "O3")
lr_df = vectorAssembler.transform(lr_df)
lr_df = lr_df.select("Datetime", "PM10", "features")
lr_df = lr_df.withColumnRenamed("PM10", "label")

# Train the model
lr = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(lr_df)

# Make predictions
lr_predictions = lr_model.transform(lr_df)

# Evaluate the model
from pyspark.ml.evaluation import RegressionEvaluator

lr_evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="label", metricName="rmse")
rmse = lr_evaluator.evaluate(lr_predictions)
print("RMSE:", rmse)


RMSE: 0.33102431599212


# Step 8: Model Building

In [81]:
# Build a forecasting model using Spark MLlib
from pyspark.ml.regression import RandomForestRegressor

# Prepare the data for modeling
rf_df = station_day.filter(station_day.StationId == "DL001").select("Date","PM10", "NO2", "SO2", "CO", "O3")
rf_df = vectorAssembler.transform(rf_df)
rf_df = rf_df.select("Date", "PM10", "features")
rf_df = rf_df.withColumnRenamed("PM10", "label")

# Split the data into training and testing datasets
train_data, test_data = rf_df.randomSplit([0.7, 0.3])

# Train the model
rf = RandomForestRegressor(numTrees=10, maxDepth=5, seed=42)
rf_model = rf.fit(train_data)

# Make predictions
rf_predictions = rf_model.transform(test_data)

# Evaluate the model
rf_evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="label", metricName="rmse")
rmse = rf_evaluator.evaluate(rf_predictions)
print("RMSE:", rmse)


RMSE: 45.49022525951996


# Step 9: Model Evaluation

In [82]:
# Evaluate the performance of the model using various evaluation metrics
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="label", metricName="rmse")
rmse = evaluator.evaluate(rf_predictions)
print("RMSE:", rmse)

mae = evaluator.evaluate(rf_predictions, {evaluator.metricName: "mae"})
print("MAE:", mae)

r2 = evaluator.evaluate(rf_predictions, {evaluator.metricName: "r2"})
print("R-squared:", r2)


RMSE: 45.49022525951996
MAE: 24.094200423463466
R-squared: 0.8816134162414029


## Step 10: Model Deployment

In [111]:
!rm -r air_quality_forecast_model/*
import os

os.rmdir("air_quality_forecast_model")


In [112]:
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import PipelineModel
from pyspark.ml.feature import VectorAssembler

# Save the model
rf_model.save("air_quality_forecast_model")

# Load the model
rf_model = RandomForestRegressor.load("air_quality_forecast_model")

# Make predictions on new data
new_data = spark.read.csv("new_data.csv", header=True, inferSchema=True)
vectorAssembler = VectorAssembler(inputCols=["PM10", "NO2", "SO2", "CO", "O3"], outputCol="features")
new_data = vectorAssembler.transform(new_data)
new_data = new_data.select("Date", "features")

forecast = rf_model.transform(new_data)

# Show the predicted values
forecast.show()


Py4JJavaError: ignored