##Author 
****
Karen Rugerio Armenta

## Business case
****

Predict US air quality data metrics based on the data obtained from 2017 to 2021. 

- Obtained from Kaggle: https://www.kaggle.com/datasets/yellowj4acket/air-quality. 

- Credits to https://www.epa.gov/

## Case Description
****
Use MLlib for regression model with a database that is characterized by a large volume of data.

Evaluate the generated model with the PySpark tools.

Test the model on a set of data and make some predictions.



#Import Libraries

In [None]:
#Update so we can work with Spark
!sudo apt update

[33m0% [Working][0m            Get:1 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Hit:2 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease
Ign:3 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:4 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Hit:5 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Hit:6 http://archive.ubuntu.com/ubuntu bionic InRelease
Hit:7 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease
Get:8 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Hit:9 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
Hit:11 http://ppa.launchpad.net/deadsnakes/ppa/ubuntu bionic InRelease
Get:12 http://archive.ubuntu.com/ubuntu bionic-backports InRelease [83.3 kB]
Hit:13 http://ppa.launchpad.net/graphics-drivers/ppa/ubuntu bionic InRelease
Get:14 http://a

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.2.2//spark-3.2.2-bin-hadoop3.2.tgz
!tar xf spark-3.2.2-bin-hadoop3.2.tgz
#Configurating Spark with Python
!pip install -q findspark
!pip install pyspark

#Setting environment variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.2-bin-hadoop3.2"

#Installing Spark
import findspark
findspark.init()
findspark.find()

#PySpark Imports
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from typing import List
import pyspark.sql.types as T
import pyspark.sql.functions as F

#For linear regression
from pyspark.ml.regression import LinearRegression

#Numpy
import numpy as np

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


#Create Pyspark Session

**We create a Pyspark Session so we can interact with different spark's functionality with a less number of constructs. This means that instead of having a spark context, SQL context, etc. all of it is encapsulated in a Spark session. We will call it PySpark_portafolio.**

In [211]:
#Creating Spark Session
spark_session = SparkSession.builder.appName('PySpark_portafolio').getOrCreate()
spark_session

#ETL Process

**The next step is importing the dataset into the session, so we can perform an etl process for a better model prediction.**

In [212]:
#Reading dataset in Spark Session
df  = spark_session.read.csv('aqi_data_17_21.csv', header = True, inferSchema=True)
df

DataFrame[_c0: int, state_code: int, county_code: int, site_number: int, latitude: double, longitude: double, parameter: string, sample_duration: string, pollutant_standard: string, date_local: string, units_of_measure: string, event_type: string, observation_count: int, observation_percent: double, validity_indicator: string, arithmetic_mean: double, first_max_value: double, first_max_hour: int, aqi: double, method: string, local_site_name: string, site_address: string, state: string, county: string, city: string, date_of_last_change: string, year: int]

**With the following command we visualize the data, it shows only the 20 first rows**

In [213]:
#Showing Dataset information
df.show()

+-------+----------+-----------+-----------+------------------+----------+--------------------+---------------+------------------+----------+--------------------+----------+-----------------+-------------------+------------------+---------------+---------------+--------------+----+--------------------+-----------------+--------------------+-------+-------+--------+-------------------+----+
|    _c0|state_code|county_code|site_number|          latitude| longitude|           parameter|sample_duration|pollutant_standard|date_local|    units_of_measure|event_type|observation_count|observation_percent|validity_indicator|arithmetic_mean|first_max_value|first_max_hour| aqi|              method|  local_site_name|        site_address|  state| county|    city|date_of_last_change|year|
+-------+----------+-----------+-----------+------------------+----------+--------------------+---------------+------------------+----------+--------------------+----------+-----------------+-------------------+---

**It is also important to visualize the data types, so we can perform a better transformation of the data to construct a better model**

In [214]:
#Dataset datatype information
df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- state_code: integer (nullable = true)
 |-- county_code: integer (nullable = true)
 |-- site_number: integer (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- parameter: string (nullable = true)
 |-- sample_duration: string (nullable = true)
 |-- pollutant_standard: string (nullable = true)
 |-- date_local: string (nullable = true)
 |-- units_of_measure: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- observation_count: integer (nullable = true)
 |-- observation_percent: double (nullable = true)
 |-- validity_indicator: string (nullable = true)
 |-- arithmetic_mean: double (nullable = true)
 |-- first_max_value: double (nullable = true)
 |-- first_max_hour: integer (nullable = true)
 |-- aqi: double (nullable = true)
 |-- method: string (nullable = true)
 |-- local_site_name: string (nullable = true)
 |-- site_address: string (nullable = true)
 |-- state: string (n

**With the following command we can see how much data (rows) we have in our dataset**

In [215]:
#Determinate the number of data has the dataset
df.count()

3976622

**As part of the cleaning process, we have to drop null values. I decided to do it by filtering rows that contain "null" values. The reason I decided to do it this way is because the data set I choose has "null" string value instead of a "null pyspark object", therefore I can not run a na.drop() function**

In [216]:
#Dropping null values. 
df = df.filter(~col("_c0").contains("null"))
df = df.filter(~col("state_code").contains("null"))
df = df.filter(~col("county_code").contains("null"))
df = df.filter(~col("site_number").contains("null"))
df = df.filter(~col("latitude").contains("null"))
df = df.filter(~col("longitude").contains("null"))
df = df.filter(~col("parameter").contains("null"))
df = df.filter(~col("sample_duration").contains("null"))
df = df.filter(~col("pollutant_standard").contains("null"))
df = df.filter(~col("date_local").contains("null"))
df = df.filter(~col("units_of_measure").contains("null"))
df = df.filter(~col("event_type").contains("null"))
df = df.filter(~col("observation_count").contains("null"))
df = df.filter(~col("validity_indicator").contains("null"))
df = df.filter(~col("observation_percent").contains("null"))
df = df.filter(~col("arithmetic_mean").contains("null"))
df = df.filter(~col("first_max_value").contains("null"))
df = df.filter(~col("first_max_hour").contains("null"))
df = df.filter(~col("aqi").contains("null"))
df = df.filter(~col("method").contains("null"))
df = df.filter(~col("local_site_name").contains("null"))
df = df.filter(~col("site_address").contains("null"))
df = df.filter(~col("state").contains("null"))
df = df.filter(~col("county").contains("null"))
df = df.filter(~col("city").contains("null"))
df = df.filter(~col("date_of_last_change").contains("null"))
df = df.filter(~col("year").contains("null"))
df.show()

+-------+----------+-----------+-----------+------------------+----------+--------------------+---------------+------------------+----------+--------------------+----------+-----------------+-------------------+------------------+---------------+---------------+--------------+----+--------------------+-----------------+--------------------+-------+-------+--------+-------------------+----+
|    _c0|state_code|county_code|site_number|          latitude| longitude|           parameter|sample_duration|pollutant_standard|date_local|    units_of_measure|event_type|observation_count|observation_percent|validity_indicator|arithmetic_mean|first_max_value|first_max_hour| aqi|              method|  local_site_name|        site_address|  state| county|    city|date_of_last_change|year|
+-------+----------+-----------+-----------+------------------+----------+--------------------+---------------+------------------+----------+--------------------+----------+-----------------+-------------------+---

**Now we can see how much values are left after dropping null values.**

In [217]:
#Determinate the new nunmber of data has the dataset after dropping nulls
df.count()

3219722

**This dataset has 14 columns with categorical information. Therefore is important to have a way to represent this information that can help with the model. I decided to do a label encoding instead of a One Hot Encoding as there are many values a column can have.** 


**Pyspark provides an indexer tool for encoding the levels of categorical features into numeric values. LabelEncoder as it name says encode labels with a value between 0 and n-1 where n is the number of distinct labels. The way it is optimized is that if a label repeats it assigns the same value to the previously assigned.**

In [218]:
#Label Encoding
indexer_parameter = StringIndexer(inputCol="parameter", outputCol="parameter_index") 
df = indexer_parameter.fit(df).transform(df)
indexer_sample_duration= StringIndexer(inputCol="sample_duration", outputCol="sample_duration_index") 
df = indexer_sample_duration.fit(df).transform(df) 
indexer_pollutant_standard= StringIndexer(inputCol="pollutant_standard", outputCol="pollutant_standard_index") 
df = indexer_pollutant_standard.fit(df).transform(df) 
indexer_date_local= StringIndexer(inputCol="date_local", outputCol="date_local_index") 
df = indexer_date_local.fit(df).transform(df) 
indexer_units_of_measure= StringIndexer(inputCol="units_of_measure", outputCol="units_of_measure_index") 
df = indexer_units_of_measure.fit(df).transform(df) 
indexer_event_type= StringIndexer(inputCol="event_type", outputCol="event_type_index") 
df = indexer_event_type.fit(df).transform(df) 
indexer_validity_indicator= StringIndexer(inputCol="validity_indicator", outputCol="validity_indicator_index") 
df = indexer_validity_indicator.fit(df).transform(df) 
indexer_method= StringIndexer(inputCol="method", outputCol="method_index") 
df = indexer_method.fit(df).transform(df) 
indexer_local_site_name= StringIndexer(inputCol="local_site_name", outputCol="local_site_name_index") 
df = indexer_local_site_name.fit(df).transform(df) 
indexer_site_address= StringIndexer(inputCol="site_address", outputCol="site_address_index") 
df = indexer_site_address.fit(df).transform(df) 
indexer_state= StringIndexer(inputCol="state", outputCol="state_index") 
df = indexer_state.fit(df).transform(df) 
indexer_country= StringIndexer(inputCol="county", outputCol="county_index") 
df = indexer_country.fit(df).transform(df) 
indexer_city= StringIndexer(inputCol="city", outputCol="city_index") 
df = indexer_city.fit(df).transform(df) 
indexer_date_of_last_change= StringIndexer(inputCol="date_of_last_change", outputCol="date_of_last_change_index") 
df = indexer_date_of_last_change.fit(df).transform(df) 
df.show()

+-------+----------+-----------+-----------+------------------+----------+--------------------+---------------+------------------+----------+--------------------+----------+-----------------+-------------------+------------------+---------------+---------------+--------------+----+--------------------+-----------------+--------------------+-------+-------+--------+-------------------+----+---------------+---------------------+------------------------+----------------+----------------------+----------------+------------------------+------------+---------------------+------------------+-----------+------------+----------+-------------------------+
|    _c0|state_code|county_code|site_number|          latitude| longitude|           parameter|sample_duration|pollutant_standard|date_local|    units_of_measure|event_type|observation_count|observation_percent|validity_indicator|arithmetic_mean|first_max_value|first_max_hour| aqi|              method|  local_site_name|        site_address|  st

**Now we are ready to drop the old categorical columns, as we have indexed ones. And drop other columns that are not that valuable for the model. This information is based on the Kaggle dataset overall analysis**

In [219]:
#Dropping Categorical Columns
df = df.drop("parameter")
df = df.drop("sample_duration")
df = df.drop("pollutant_standard")
df = df.drop("date_local")
df = df.drop("units_of_measure")
df = df.drop("event_type")
df = df.drop("validity_indicator")
df = df.drop("method")
df = df.drop("local_site_name")
df = df.drop("site_address")
df = df.drop("state")
df = df.drop("county")
df = df.drop("city")
df = df.drop("date_of_last_change")
#Deleating information after analizing the dataset
df = df.drop("parameter_index") #1 unique value
df = df.drop("event_type_index") #95% has a "none" string unique value
df = df.drop("observation_count") #95% has a "1" int value
df = df.drop("observation_percent") #95% has a "100" int value
df = df.drop("site_address") #has a lot of unique value
df = df.drop("units_of_measure_index") #1 unique value
df = df.drop("_c0") #ID

In [220]:
#Visualizing the dataset
df.show()

+----------+-----------+-----------+------------------+----------+---------------+---------------+--------------+----+----+---------------------+------------------------+----------------+------------------------+------------+---------------------+------------------+-----------+------------+----------+-------------------------+
|state_code|county_code|site_number|          latitude| longitude|arithmetic_mean|first_max_value|first_max_hour| aqi|year|sample_duration_index|pollutant_standard_index|date_local_index|validity_indicator_index|method_index|local_site_name_index|site_address_index|state_index|county_index|city_index|date_of_last_change_index|
+----------+-----------+-----------+------------------+----------+---------------+---------------+--------------+----+----+---------------------+------------------------+----------------+------------------------+------------+---------------------+------------------+-----------+------------+----------+-------------------------+
|         1| 

#Model

**The first thing we have to do is determinate the dependant and the independant variables.**

**The independant variables in this Regression Model will be store them in a list of features, called "Independent Features"**

In [221]:
#Defining independant variables
featassembler = VectorAssembler(inputCols=['state_code',
 'county_code',
 'latitude',
 'longitude',
 'site_number',
 'arithmetic_mean',
 'first_max_value',
 'first_max_hour',
 'year',
 'sample_duration_index',
 'sample_duration_index',
 'pollutant_standard_index',
 'date_local_index',
 'validity_indicator_index',
 'method_index',
 'site_address_index',
 'state_index',
 'county_index',
 'city_index',
 'date_of_last_change_index'
 ], outputCol = "Independent Features" )
featassembler

VectorAssembler_2b1d838e021e

**Now we are going to make this VectorAssembler part of our dataframe, so we have all independant variables stored in a single column**

In [222]:
#Add independant variables as a new column 
result = featassembler.transform(df)
result.show()

+----------+-----------+-----------+------------------+----------+---------------+---------------+--------------+----+----+---------------------+------------------------+----------------+------------------------+------------+---------------------+------------------+-----------+------------+----------+-------------------------+--------------------+
|state_code|county_code|site_number|          latitude| longitude|arithmetic_mean|first_max_value|first_max_hour| aqi|year|sample_duration_index|pollutant_standard_index|date_local_index|validity_indicator_index|method_index|local_site_name_index|site_address_index|state_index|county_index|city_index|date_of_last_change_index|Independent Features|
+----------+-----------+-----------+------------------+----------+---------------+---------------+--------------+----+----+---------------------+------------------------+----------------+------------------------+------------+---------------------+------------------+-----------+------------+---------

**Now we make a new dataframe that includes this features column and the independant varible**

In [223]:
#Creating new dataframe only with the Independant features and the Air Quality values
final_data = result.select("Independent features", "aqi")
final_data.show()

+--------------------+----+
|Independent features| aqi|
+--------------------+----+
|[1.0,3.0,30.49747...|21.0|
|[1.0,3.0,30.49747...|21.0|
|[1.0,3.0,30.49747...|21.0|
|[1.0,3.0,30.49747...|21.0|
|[1.0,3.0,30.49747...|22.0|
|[1.0,3.0,30.49747...|22.0|
|[1.0,3.0,30.49747...|22.0|
|[1.0,3.0,30.49747...|22.0|
|[1.0,3.0,30.49747...|19.0|
|[1.0,3.0,30.49747...|19.0|
|[1.0,3.0,30.49747...|19.0|
|[1.0,3.0,30.49747...|19.0|
|[1.0,3.0,30.49747...|30.0|
|[1.0,3.0,30.49747...|30.0|
|[1.0,3.0,30.49747...|30.0|
|[1.0,3.0,30.49747...|30.0|
|[1.0,3.0,30.49747...|16.0|
|[1.0,3.0,30.49747...|16.0|
|[1.0,3.0,30.49747...|16.0|
|[1.0,3.0,30.49747...|16.0|
+--------------------+----+
only showing top 20 rows



**It is important to define the data that is going to be for training the model, and the data that is going to be for testing the model.**

**This data splitting is used mainly to avoid overfitting. So we split the data into two sets, the train and the test dataset. For this model I decided to use 80% of the data to train the model, and the other 20% to test it.**

In [224]:
#Sppliting data into train and test with 80% of the total and 20% respectively 
train_data, test_data = final_data.randomSplit([0.80, 0.20])

**Now it's time to fit the model. To do so, I used the Linear Regression Pyspark API that takes two atributtes, the "featuresCol", these are the independant variables, and the "labelCol" that is the dependant variable**

**Then the Linear regression fit is done with the train data**

In [225]:
#Fitting the model given the independent features to predict Air Quality values
model = LinearRegression(featuresCol = 'Independent features', labelCol='aqi')
model = model.fit(train_data)

**At this point we have a trained model, now is time to see how good is the model performing**

In [226]:
#Getting model intercept 
model.intercept

348.50367396202375

**The intercept represents the mean value of the response variable when all of the predictor variables in the model are equal to zero. In this case we have a intercept of 348.503**

In [227]:
#Getting model beta coefficients
model.coefficients

DenseVector([-0.0202, 0.0044, -0.1067, 0.0446, -0.0001, 1.2899, 1.2899, -0.0466, -0.1627, 0.2498, 0.2498, -0.0026, 0.0002, 0.0, 0.0416, -0.0002, -0.0332, -0.0028, 0.0002, -0.0004])

**Getting the beta coefficients allows comparisons between independent variables to determine which has the most influence on the dependent variable. In this case we can see that B5, B6, B9 have the biggest values, therefore are the ones that influence the model the most.**
#REVISAR

In [228]:
#Getting R2 adjusted
model.summary.r2adj

0.8616042919724056

**The r2adj pyspark function, help us determinate the accuracy of the model. In this case we have an accuracy of 86% which is a pretty good percentage.**

**We have a very good model, but now is time to test it with the test dataset, to see if it is not overfitted. To do so, we will use the model.evaluate Pyspark function to make some Air Quality predictions**

In [229]:
#Adding a new column to the dataset to visualize predictions
prediction_result = model.evaluate(test_data)
prediction_result.predictions.show()



+--------------------+-----+-------------------+
|Independent features|  aqi|         prediction|
+--------------------+-----+-------------------+
|(20,[0,1,2,3,4,5,...| 13.0| 17.091288676730642|
|(20,[0,1,2,3,4,5,...|128.0| 130.96643009369754|
|(20,[0,1,2,3,4,5,...| 84.0|  81.77314315809855|
|(20,[0,1,2,3,4,5,...| 79.0|   76.4297310023012|
|(20,[0,1,2,3,4,5,...| 98.0|  99.17836307687875|
|(20,[0,1,2,3,4,5,...| 88.0|  87.73062027908429|
|(20,[0,1,2,3,4,5,...| 88.0|  87.72544752796142|
|(20,[0,1,2,3,4,5,...| 33.0|  31.65829027966396|
|(20,[0,1,2,3,4,5,...| 33.0| 31.653117528541088|
|(20,[0,1,2,3,4,5,...|  0.0|-1.7277932805557157|
|(20,[0,1,2,3,4,5,...|  0.0|-0.1774921810965111|
|(20,[0,1,2,3,4,5,...|  0.0| 1.3663656042513708|
|(20,[0,1,2,3,4,5,...|  0.0|  7.185886484077287|
|(20,[0,1,2,3,4,5,...|  0.0|    8.1122015427581|
|(20,[0,1,2,3,4,5,...|  0.0|   9.10345842524572|
|(20,[0,1,2,3,4,5,...|  0.0|   9.73202261026114|
|(20,[0,1,2,3,4,5,...|  2.0| 10.768213130630784|
|(20,[0,1,2,3,4,5,..

**We can see we have a big difference between the predictions and the actual Air Quality values, but we are just seeing 20 rows out of the over 1million rows dataset. So, to get more accurate statistics we will get the error gap of the whole dataset**

In [230]:
#Getting model errors
prediction_result.meanAbsoluteError, prediction_result.meanSquaredError

(4.302824461807081, 49.20795522160201)

**According with Nate Watson(2018) "The absolute error is the absolute value of the difference between the forecasted value and the actual value. MAE tells us how big of an error we can expect from the forecast on average." This tells us that the average difference between the Air Quality data value and the value predicted by the model is 4.3. And the MSE is 49.20**