In [None]:
#Import libraries as needed
import os
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt

In [None]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg, col, first, last, lag, lead, when
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

conf = SparkConf().setAppName('yuck').setMaster("local[*]").set("spark.driver.memory", "4g")
sc = SparkContext(conf=conf)
spark = SparkSession(sc)

In [None]:
from pyspark.sql.functions import unix_timestamp
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType

# Define the schema for the CSV files
schema = StructType ([
 StructField ('date', StringType(), True ),
 StructField ('BEN', DoubleType(), True ),
 StructField ('CO', DoubleType(), True ),
 StructField ('EBE', DoubleType(), True ),
 StructField ('MXY', DoubleType(), True ),
 StructField ('NMHC', DoubleType(), True ),
 StructField ('NO_2', DoubleType(), True ),
 StructField ('NOx', DoubleType(), True ),
 StructField ('OXY', DoubleType(), True ),
 StructField ('O_3', DoubleType(), True ),
 StructField ('PM10', DoubleType(), True ),
 StructField ('PM25', DoubleType(), True ),
 StructField ('PXY', DoubleType(), True ),
 StructField ('SO_2', DoubleType(), True ),
 StructField ('TCH', DoubleType(), True ),
 StructField ('TOL', DoubleType(), True )])

In [None]:
# insert csv files to spark_df dataframe
data_path = 'C:\\Users\\eleni\\Documents\\Diplw\\Jupyter-Notebooks\\diplw\\csvs_per_year'
spark_df = spark.read.csv(data_path, header=True, schema=schema)

In [None]:
#STEP 1: Impute Missing Values

In [None]:
# Convert the ’date’ column to Unix timestamps
from pyspark.sql.functions import unix_timestamp, to_utc_timestamp
from pyspark.sql.types import TimestampType
spark_df = spark_df.withColumn('unix_time', unix_timestamp(spark_df.date ,'yyyy-MM-dd HH:mm:ss').cast('timestamp')).drop('date')

In [None]:
# Order the DataFrame by unix_time and add a row number column
window = Window.orderBy('unix_time')
spark_df = spark_df.withColumn('row_num', row_number().over(window))

In [None]:
#FILL IN MISSING VALUES STARTS HERE
# Add previous and next value columns for each column except 'unix_time' and 'row_num'
for col_name in spark_df.columns:
    if col_name != "unix_time" and col_name != "row_num":
        spark_df = spark_df.withColumn(f"{col_name}_prev", lag(col_name).over(window))
        spark_df = spark_df.withColumn(f"{col_name}_next", lead(col_name).over(window))

In [None]:
# Interpolate missing values for each column except 'unix_time' and 'row_num'
from pyspark.sql.functions import col
for col_name in spark_df.columns:
    if col_name != "unix_time" and col_name != "row_num":
        spark_df = spark_df.withColumn(col_name, when(col(col_name).isNull(),
                                          (last(col_name, True).over(window) +
                                           first(col_name, True).over(window)) / 2)
                                          .otherwise(col(col_name)))

In [None]:
# Drop the columns with previous and next value for each column
for col_name in spark_df.columns:
    if col_name.endswith("_prev") or col_name.endswith("_next"):
        spark_df = spark_df.drop(col_name)
#END FILL IN MISSING VALUES

In [None]:
#STEP 2: ADD AQI COLUMNS

In [None]:
#ADD AQI
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# Define the breakpoints and corresponding index levels for each pollutant
breakpoints = {
    'PM25': [0, 10, 20, 25, 50, 75, 800],
    'PM10': [0, 20, 40, 50, 100, 150, 1200],
    'NO_2': [0, 40, 90, 120, 230, 340, 1000],
    'O_3': [0, 50, 100, 130, 240, 380, 800],
    'SO_2': [0, 100, 200, 350, 500, 750, 1250]
}
# Define each category
categories = ['Good', 'Fair', 'Moderate', 'Poor', 'Very Poor', 'Extremely Poor']

In [None]:
# Define a function to calculate the index level for each pollutant concentration
from pyspark.sql.functions import array, array_max
#import pyspark.sql.functions as F

def calculate_index_level(pollutant, concentration):
    breakpoints_list = breakpoints[pollutant]
    for i in range(len(breakpoints_list)-1):
        if breakpoints_list[i] <= concentration < breakpoints_list[i+1]:
            return i+1
    return 6  # If concentration exceeds the highest breakpoint, return the highest index level

In [None]:
from pyspark.sql.functions import udf

calculate_aqi_index_udf = udf(
    lambda no2_conc, o3_conc, pm10_conc, pm25_conc, so2_conc:
        int(max([calculate_index_level('NO_2', no2_conc),
                 calculate_index_level('O_3', o3_conc),
                 calculate_index_level('PM10', pm10_conc),
                 calculate_index_level('PM25', pm25_conc),
                 calculate_index_level('SO_2', so2_conc)])),
    IntegerType()
)

In [None]:
from pyspark.sql.functions import when

# Add AQI index as a new column to the DataFrame using the UDF
spark_df = spark_df.withColumn('AQI_Index', calculate_aqi_index_udf('NO_2','O_3','PM10', 'PM25', 'SO_2'))

# Define a UDF to calculate the AQI category for each row
calculate_aqi_category_udf = udf(lambda index_level: categories[index_level-1], StringType())

# Add AQI category as a new column to the DataFrame using the UDF
spark_df = spark_df.withColumn('AQI_Category', calculate_aqi_category_udf('AQI_Index'))

from pyspark.sql.functions import when

#Create a new column named 'AQI_GenPop_Category'
spark_df = spark_df.withColumn('AQI_GenPop_Category', when((spark_df['AQI_Index'] >= 1) & (spark_df['AQI_Index'] <= 3), 'Safe').otherwise('Hazardous'))

#Create a new column named 'AQI_GenPop_Index'
spark_df = spark_df.withColumn('AQI_GenPop_Index', when((spark_df['AQI_GenPop_Category'] == 'Safe'), 0).otherwise(1))



In [None]:
spark_df.printSchema()

In [None]:
#STEP 3: REMOVE OUTLIERS

In [None]:
#Outlier Handling and normalization in apache spark

from pyspark.ml.feature import RobustScaler
# Get the columns to normalize
pollutants = spark_df.columns[:-6]
outliers = {}

In [None]:
print(pollutants)

In [None]:
for pollutant in pollutants:
    # Find outliers using the IQR method with k=1.5
    quantiles = spark_df.approxQuantile(pollutant, [0.25, 0.75], 0.05)
    Q1 = quantiles[0]
    Q3 = quantiles[1]
    IQR = Q3 - Q1
    lower_bound = Q1 - 1.5 * IQR
    upper_bound = Q3 + 1.5 * IQR
    outliers[pollutant] = (lower_bound, upper_bound)

In [None]:
spark_df_clean = spark_df.select(spark_df.columns)

In [None]:
# Replace outliers with null values
from pyspark.sql.functions import col
for pollutant in pollutants:
    # Rename the original column
    spark_df_clean = spark_df_clean.withColumnRenamed(pollutant, f"{pollutant}_orig")

    # Replace outliers with null values
    expr = when(~col(f"{pollutant}_orig").between(outliers[pollutant][0], outliers[pollutant][1]), None).otherwise(col(f"{pollutant}_orig")).alias(pollutant)
    spark_df_clean = spark_df_clean.select("*", expr)
    spark_df_clean = spark_df_clean.drop(f"{pollutant}_orig")
    spark_df_clean = spark_df_clean.fillna({f"{pollutant}": "null"})

In [None]:
#Interpolate->Fill null values of 'spark_df_clean'
# Order the DataFrame by unix_time and add a row number column
window_clean = Window.orderBy('unix_time')
spark_df_clean = spark_df_clean.withColumn('row_num', row_number().over(window_clean))

In [None]:
# Add previous and next value columns for each column except 'unix_time' and 'row_num'
for col_name in spark_df_clean.columns:
    if col_name != "unix_time" and col_name != "row_num" and col_name != "AQI_Index" and col_name != "AQI_Category" and col_name != "AQI_GenPop_Index" and col_name != "AQI_GenPop_Category":
        spark_df_clean = spark_df_clean.withColumn(f"{col_name}_prev", lag(col_name).over(window_clean))
        spark_df_clean = spark_df_clean.withColumn(f"{col_name}_next", lead(col_name).over(window_clean))

In [None]:
# Interpolate missing values for each column except 'unix_time' and 'row_num'
for col_name in spark_df_clean.columns:
    if col_name != "unix_time" and col_name != "row_num" and col_name != "AQI_Index" and col_name != "AQI_Category" and col_name != "AQI_GenPop_Index" and col_name != "AQI_GenPop_Category":
        spark_df_clean = spark_df_clean.withColumn(col_name, when(col(col_name).isNull(),(last(col_name, True).over(window_clean) + first(col_name, True).over(window_clean)) / 2).otherwise(col(col_name)))

In [None]:
# Drop the columns with previous and next value for each column
for col_name in spark_df_clean.columns:
    if col_name.endswith("_prev") or col_name.endswith("_next"):
        spark_df_clean = spark_df_clean.drop(col_name)

In [None]:
#PXY has 100 null values after interpolation, i will fill those seperately
from pyspark.sql.functions import mean

# Calculate the mean value of the 'PXY' column
mean_val = spark_df_clean.select(mean(spark_df_clean['PXY'])).collect()[0][0]

# Fill the remaining null values in the 'PXY' column with the mean value
spark_df_clean = spark_df_clean.fillna(mean_val, subset=['PXY'])

In [None]:
#Check if interpolation successful
from pyspark.sql.functions import col, sum
from pyspark.sql.functions import isnan, when, count

null_counts = spark_df_clean.agg(*[sum(col(c).isNull().cast("int")).alias(c) for c in spark_df_clean.columns])

# Print out the null counts for each column
null_counts.show()

#END FILL IN MISSING VALUES

In [None]:
#STEP 4: Z-SCORE NORMALIZATION FOR MEAN=0 and STD=1 

In [None]:
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import expr
from pyspark.ml.functions import vector_to_array
from pyspark.sql.functions import array

# Get the columns to normalize
pollutants = spark_df_clean.columns[6:]

# Create a VectorAssembler to combine the columns to be normalized
assembler = VectorAssembler(inputCols=pollutants, outputCol="features")

# Transform the Spark DataFrame using the VectorAssembler
assembled_df = assembler.transform(spark_df_clean)

# Create a StandardScaler object
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures",
                        withStd=True, withMean=True)

# Compute summary statistics and generate a StandardScalerModel
scalerModel = scaler.fit(assembled_df)

# Normalize the data using the StandardScalerModel
scaled_df = scalerModel.transform(assembled_df)


In [None]:
# Convert the 'scaledFeatures' struct column into an array column
array_df = scaled_df.select("row_num", vector_to_array('scaledFeatures').alias('scaled_array'))

# Select individual elements of the array and create separate columns for each pollutant
for i, col in enumerate(pollutants):
    array_df = array_df.withColumn(col, array_df['scaled_array'][i])

# Drop the 'scaled_array' column
array_df = array_df.drop('scaled_array')

# Join the normalized pollutant columns with the original DataFrame
norm_df = spark_df_clean.select(*spark_df_clean.columns[:6]).join(array_df, on="row_num", how="inner")

In [None]:
norm_df.printSchema()

In [None]:
norm_df.filter("row_num in (1, 2, 3, 4, 5)").show(5)

In [None]:
#Leave this area for debugging, printing rows,values,columns etc.

In [None]:
#sample spark_df to visualize data

sampled_data = spark_df_clean.sample(False, 0.1) #40% of the data
pandas_df = sampled_data.toPandas()

In [None]:
df_top = pandas_df[['CO', 'EBE','PXY','NO_2', 'O_3', 'PM10', 'PM25','TCH']]
df_bottom = pandas_df[['BEN', 'MXY', 'NMHC', 'NOx', 'OXY', 'SO_2', 'TOL']]

fig, axs = plt.subplots(2, figsize=(10, 8))

# First plot with first 7 rows
sns.boxplot(data=df_top, palette='PuRd', ax=axs[0])
axs[0].set_xticklabels(df_top.columns, rotation=45)

# Second plot with last 7 rows
sns.boxplot(data=df_bottom, palette='PuRd', ax=axs[1])
axs[1].set_xticklabels(df_bottom.columns, rotation=45)

plt.show()

In [None]:
pandas_df = pandas_df.reset_index(drop=True)

# Create a 2x3 grid of subplots
fig, axs = plt.subplots(nrows=5, ncols=3, figsize=(12, 12))

# Plot a histogram for each column
for i, pollutant in enumerate(pollutants):
    # Get the data for the column
    data = pandas_df[pollutant].dropna()
    
    # Determine the subplot location based on the column index
    row = i // 3
    col = i % 3
    # get the range of values in the column, ignoring NaN and Inf
    x_min = np.nanmin(pandas_df[pollutant][np.isfinite(pandas_df[pollutant])])
    x_max = np.nanmax(pandas_df[pollutant][np.isfinite(pandas_df[pollutant])])
    
    # calculate number of bins using IQR rule
    n = len(pandas_df[pollutant])
    std = np.std(pandas_df[pollutant])
    k = 3.5 * std / (n**(1/3))
    num_bins = int(np.ceil((x_max - x_min) / k)) if k != 0 else 1
    # Plot the histogram with kde
    sns.histplot(data, kde=True, bins=num_bins, color=sns.color_palette("PuRd", 15)[i], ax=axs[row, col])
    
    # Add a vertical line for the mean
    mean = data.mean()
    axs[row, col].axvline(mean, color='k', linestyle='dashed', linewidth=1)
    
    # Add a vertical line for the standard deviation
    std = data.std()
    axs[row, col].axvline(mean+std, color='#8C78F0', linestyle='dashed', linewidth=1)
    axs[row, col].axvline(mean-std, color='#8C78F0', linestyle='dashed', linewidth=1)
    
    # Set the title and axis labels
    axs[row, col].set_title(pollutant)
    axs[row, col].set_xlabel('Value')
    axs[row, col].set_ylabel('Density')
    axs[row, col].set_xlim([data.min(), data.max()])  # set x-axis range

# Adjust the spacing between subplots
fig.subplots_adjust(hspace=0.4, wspace=0.4)

# Show the plot
plt.show()

In [None]:
print(pollutants)

In [None]:
#pandas df of AQI_Category

sampled_data = spark_df_clean.select('AQI_Category')
pdf = sampled_data.toPandas()

In [None]:
# Get the count of each category and calculate the percentage
category_count = pdf.groupby('AQI_Category').size()
category_percentage = category_count / category_count.sum() * 100


# Create the bar plot
ax = sns.barplot(x=category_percentage.index, y=category_percentage.values, palette="BuPu")

# Set the axis labels and title
ax.set(xlabel='AQI Category', ylabel='Percentage', title='Percentage of AQI Categories')

# Show the plot
plt.show()

In [None]:
spark_df_clean.write.format('csv').option('header', True ).mode('overwrite').save('C:/Users/eleni/Documents/Diplw/Jupyter-Notebooks/diplw/csvs_per_year/clean_wo_norm.csv')


In [None]:
# Save the DataFrame to a CSV file in the current working directory
pandas_df.to_csv('C:\\Users\\eleni\\Documents\\Diplw\\Jupyter-Notebooks\\diplw\\clean_sample.csv', index=False)

In [None]:
spark_df_clean.printSchema()

In [None]:
spark_df.show(5)