In [1]:
!pip install pyspark

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit
from pyspark.sql.functions import col
import os
from pyspark.sql.functions import sum as spark_sum
from pyspark.sql.functions import mean
from pyspark.sql.functions import when
from pyspark.sql.functions import lag
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.window import Window
from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor
import sys
from pyspark.sql.functions import year, month, dayofmonth
import seaborn as sns
import matplotlib.pyplot as plt

spark = SparkSession.builder.appName("AirQuality").config("spark.sql.debug.maxToStringFields", 1000).getOrCreate()

: 

# Preparing Data

In [None]:
Dir = '/kaggle/input/time-series-air-quality-data-of-india-2010-2023/'

: 

In [None]:
names = []
station = 'stations_info.csv'

for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        if filename != station:
            names.append(filename)

: 

#### Stations Data

In [None]:
stations = spark.read.csv(Dir+station,header=True,inferSchema=True) 
stations.printSchema()

: 

In [None]:
stations = stations.select('file_name','state','city','start_month_num','start_year')
stations.printSchema()

: 

#### Atmosphere Data

In [None]:
data = spark.read.csv(Dir + names[0], header=True, inferSchema=True)
state_df = stations.filter(stations.file_name == names[0][:-4])
data = data.withColumn('state', lit(state_df.first()[1]))
data = data.withColumn('city', lit(state_df.first()[2]))
data = data.withColumn('start_month_num', lit(state_df.first()[3]))
data = data.withColumn('start_year', lit(state_df.first()[4]))

: 

In [None]:
for name in names[1:]:
    df = spark.read.csv(Dir + name, header=True, inferSchema=True)
    
    state_df = stations.filter(stations.file_name == name[:-4])
    df = df.withColumn('state', lit(state_df.first()[1]))
    df = df.withColumn('city', lit(state_df.first()[2]))
    df = df.withColumn('start_month_num', lit(state_df.first()[3]))
    df = df.withColumn('start_year', lit(state_df.first()[4]))
    
    if len(data.columns) == len(df.columns):
        data = data.union(df)


: 

# Preprocessing

#### What are the types of columns?

In [None]:
data.printSchema()

: 

#### Renaming Columns because PM2.5 gives error on various operations

In [None]:

column_rename_mapping = {
    'From Date': 'from_date',
    'To Date': 'to_date',
    'PM2.5 (ug/m3)': 'pm2_5',
    'PM10 (ug/m3)': 'pm10',
    'NO (ug/m3)': 'no',
    'NO2 (ug/m3)': 'no2',
    'NOx (ppb)': 'nox',
    'NH3 (ug/m3)': 'nh3',
    'SO2 (ug/m3)': 'so2',
    'CO (ug/m3)': 'co',
    'Ozone (ug/m3)': 'ozone',
    'Benzene (ug/m3)': 'benzene',
    'Toluene ()': 'toluene',
    'Temp (degree C)': 'temp_c',
    'RH (%)': 'rh_percent',
    'WS (m/s)': 'ws_m_s',
    'WD (degree)': 'wd_degree',
    'SR (W/mt2)': 'sr_w_mt2',
    'BP (mmHg)': 'bp_mmhg',
    'VWS (m/s)': 'vws_m_s',
    'AT (degree C)': 'at_c',
    'RF (mm)': 'rf_mm',
    'state': 'state',
    'city': 'city',
    'start_month_num': 'start_month_num',
    'start_year': 'start_year'
}

for old_name, new_name in column_rename_mapping.items():
    data = data.withColumnRenamed(old_name, new_name)

: 

#### How large is data?

In [None]:
data.count()

: 

#### Is there any nulls values?

In [None]:
null_counts = {}

for column in data.columns:
    null_count = data.where(col(column).isNull()).count()
    null_counts[column] = null_count

for column, count in null_counts.items():
    print(f"{column} : {count} null values")


: 

#### Filling null values with mean

In [None]:
from pyspark.sql.functions import mean
cols = ['pm2_5', 'pm10', 'no', 'no2', 'nox', 'nh3', 'so2', 'co', 'ozone','benzene', 'toluene', 'temp_c', 'rh_percent', 'ws_m_s', 'wd_degree',
                  'sr_w_mt2', 'bp_mmhg', 'vws_m_s', 'at_c', 'rf_mm']

for col in cols:
    mean_value = data.select(mean(col)).collect()[0][0]
    data = data.fillna({col: mean_value})


: 

# Outliers

In [None]:
def OutlierCount(numerical_features):
    quantiles = data.approxQuantile(numerical_features, [0.25, 0.75], 0.05)

    print("Quantiles:", quantiles)

    iqr = dict(zip(numerical_features, [q3 - q1 for q1, q3 in zip(quantiles[0], quantiles[1])]))

    print("IQR:", iqr)

    outliers = data.select([col(c).between(q1 - 1.5 * iqr[c], q3 + 1.5 * iqr[c]).alias(c) for c, q1, q3 in zip(numerical_features, quantiles[0], quantiles[1])]).toPandas()

    print("Outlier count:")
    print(outliers.sum())


: 

In [None]:
numerical_features = ['no2', 'so2','co', 'ozone','pm2_5', 'pm10']
for i in range(0, len(numerical_features), 2):
    OutlierCount([numerical_features[i], numerical_features[i+1] if i+1 < len(numerical_features) else None])


: 

# Feature Engineering

#### Creating Output Column using formula 

In [None]:
def calculate_aqi(pm25, pm10, no2, so2, co, ozone):
    pm25_breakpoints = [0, 12.1, 35.5, 55.5, 150.5, 250.5, 350.5, 500.5]
    pm25_aqi_ranges = [0, 50, 100, 150, 200, 300, 400, 500]

    pm10_breakpoints = [0, 54, 154, 254, 354, 424, 504, 604]
    pm10_aqi_ranges = [0, 50, 100, 150, 200, 300, 400, 500]

    no2_breakpoints = [0, 54, 101, 361, 650, 1250, 1650, 2050]
    no2_aqi_ranges = [0, 50, 100, 150, 200, 300, 400, 500]

    so2_breakpoints = [0, 35, 75, 185, 304, 604, 804, 1004]
    so2_aqi_ranges = [0, 50, 100, 150, 200, 300, 400, 500]

    co_breakpoints = [0, 4.5, 9.5, 12.5, 15.5, 30.5, 40.5, 50.5]
    co_aqi_ranges = [0, 50, 100, 150, 200, 300, 400, 500]

    ozone_breakpoints = [0, 55, 71, 86, 106, 200, 270, 360]
    ozone_aqi_ranges = [0, 50, 100, 150, 200, 300, 400, 500]

    aqi_pm25 = calculate_aqi_component(pm25, pm25_breakpoints, pm25_aqi_ranges)
    aqi_pm10 = calculate_aqi_component(pm10, pm10_breakpoints, pm10_aqi_ranges)
    aqi_no2 = calculate_aqi_component(no2, no2_breakpoints, no2_aqi_ranges)
    aqi_so2 = calculate_aqi_component(so2, so2_breakpoints, so2_aqi_ranges)
    aqi_co = calculate_aqi_component(co, co_breakpoints, co_aqi_ranges)
    aqi_ozone = calculate_aqi_component(ozone, ozone_breakpoints, ozone_aqi_ranges)
    
    overall_aqi = greatest_aqi(aqi_pm25, aqi_pm10, aqi_no2, aqi_so2, aqi_co, aqi_ozone)
    
    return overall_aqi

def calculate_aqi_component(value, breakpoints, aqi_ranges):
    index = 0
    for i, breakpoint in enumerate(breakpoints):
        if value <= breakpoint:
            index = i
            break
    else:
        index = len(breakpoints) - 1
    
    return ((aqi_ranges[index] - aqi_ranges[index - 1]) / (breakpoints[index] - breakpoints[index - 1])) * (value - breakpoints[index - 1]) + aqi_ranges[index - 1]

def greatest_aqi(*args):
    return max(args)


: 

In [None]:
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import udf, col

calculate_aqi_udf = udf(lambda pm25, pm10, no2, so2, co, ozone: calculate_aqi(pm25, pm10, no2, so2, co, ozone), DoubleType())

data = data.withColumn("AQI", calculate_aqi_udf(col("pm2_5"), col("pm10"), col("no2"), col("so2"), col("co"), col("ozone")))

: 

In [None]:
relevant_columns = ['from_date','to_date', 'pm2_5', 'pm10', 'no2', 'so2', 'co', 'ozone','state','city' ,'AQI']

data = data.select(*relevant_columns)

: 

In [None]:
data = data.withColumn("AQI_lag1", lag("AQI", 1).over(Window.partitionBy("city").orderBy("from_date")))
data = data.withColumn("AQI_lag2", lag("AQI", 2).over(Window.partitionBy("city").orderBy("from_date")))

data = data.withColumn("year", year("from_date"))

: 

# EDA

In [None]:
data.printSchema()

: 

In [None]:
# Descriptive statistics
numerical_features = ['no2', 'so2','co', 'ozone','pm2_5', 'pm10']
for feature in numerical_features:
    data.select(feature).describe().show()


: 

In [None]:
data.select('from_date', 'pm2_5', 'pm10', 'no2', 'so2', 'co', 'ozone').toPandas().plot(x='from_date', subplots=True, figsize=(15, 10))
plt.show()


: 

In [None]:
# Boxplots or violin plots for categorical variables
data.select('state', 'pm2_5', 'pm10', 'no2').toPandas().boxplot(by='state', figsize=(35, 10))
plt.show()

: 

In [None]:

data_pd = data.select("year", "AQI").toPandas()

aqi_trend = data_pd.groupby("year")["AQI"].mean().reset_index()

plt.figure(figsize=(10, 6))
sns.lineplot(data=aqi_trend, x="year", y="AQI", marker='o')
plt.title("AQI Trend Per Year")
plt.xlabel("Year")
plt.ylabel("AQI")
plt.grid(True)
plt.show()


: 

# Model Training

#### Test Train Split

In [None]:
train_data, test_data = data.randomSplit([0.8, 0.2], seed=42)

: 

#### Model Training

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
import sys

try:
    assembler = VectorAssembler(inputCols=["year"], outputCol="features")
    rf = RandomForestRegressor(featuresCol="features", labelCol="AQI")

    pipeline = Pipeline(stages=[assembler, rf])

    model = pipeline.fit(train_data)

    predictions = model.transform(test_data)

    evaluator = RegressionEvaluator(labelCol="AQI", predictionCol="prediction", metricName="rmse")
    rmse = evaluator.evaluate(predictions)

except Exception as e:
    print("An error occurred:", e)
    print("Traceback:", sys.exc_info()[0])


: 

#### RMSE

In [None]:
print("Root Mean Squared Error :", rmse)

: 

In [None]:
spark.stop()

: 