# Submission Notebook

## DS 5110 Final Project 

## Air Quality Group 

* Daniel Heffley (dh3by)
* Camille Leonard (cvl7qu)
* Steph Verbout (sv8jy)
* Shahriar Shahrokhabadi (ss3qs)

# Data 

Our original data sources were the [EPA's Air Quality Data](https://aqs.epa.gov/aqsweb/airdata/download_files.html#Raw) and [Socio-Economic data from OpenIntro](https://www.openintro.org/data/?data=county_complete).

We compiled daily data for gas and particulate matter from 2017-2019 and then joined the air quality data with the Scoio-Economic data by county. The Socio-Economic data was published in 2010. 


## Import 

In [1]:
import pandas as pd
import numpy as np
import seaborn as sns
import os 
from pyspark.sql import SparkSession
from functools import reduce
from pyspark.sql import DataFrame
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType, DoubleType, TimestampType, DecimalType
from pyspark.sql.functions import col, asc, to_date, unix_timestamp, to_timestamp, count, when, isnan
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import VectorAssembler, Imputer, StringIndexer
import matplotlib.pyplot as plt
from pyspark.ml.feature import StandardScaler
import pyspark.sql.functions as f


In [2]:
os.chdir(r'/project/ds5559/Air_Quality_Group')
os.getcwd()

'/project/ds5559/Air_Quality_Group'

In [3]:
spark = SparkSession.builder \
        .master("local[*]") \
        .appName("Create Final Dataset") \
        .config("spark.executor.memory", '20g') \
        .config('spark.executor.cores', '5') \
        .config('spark.executor.instances', '17') \
        .config("spark.driver.memory",'1g') \
        .getOrCreate()


In [4]:
# For viewing in the UI to monitor memory usage. Must be used from the desktop instance. 
# spark.sparkContext.uiWebUrl

In [4]:
# Import data
df_raw = spark.read.csv('DF_FINAL_AQI_W_INCOME.csv',inferSchema = True ,  header = True) #,schema = schema

In [None]:
# Confirm the schema is correct 
df_raw.printSchema()

## Pre-Processing

### Column Removal

In [5]:
# Drop unnecessary columns 
df_raw = df_raw.drop('_c0').drop('state + county')

#Typecast date local
df_raw = df_raw.withColumn('Date Local', df_raw['Date Local'].cast(DateType()))

# Droping first and second group of features from DB
CL1 = ['Site Num','Parameter Code_CO','Parameter Name_CO','Parameter Code_SO2','Parameter Name_SO2','Parameter Code_NO2']
CL2 = ['Parameter Name_NO2','Parameter Code_O3','Parameter Name_O3','Parameter Code_PM2_5FRM','Parameter Name_PM2_5FRM','Parameter Code_PM2_5NON_FRM']
CL3 = ['Parameter Name_PM2_5NON_FRM','Parameter Code_PM10MASS','Parameter Name_PM10MASS','Parameter Code_PM10SPEC','Parameter Name_PM10SPEC']
CL4 = ['Parameter Code_HAPS','Parameter Name_HAPS','Parameter Code_LEAD','Parameter Name_LEAD','Parameter Code_NO','Parameter Name_NO']
CL5 = ['Parameter Code_PRESS','Parameter Name_PRESS','Parameter Code_RH','Parameter Name_RH','Parameter Code_TEMP','Parameter Name_TEMP']
CL6 = ['Parameter Code_WIND','Parameter Name_WIND']

df_raw = df_raw.drop(*CL1,*CL2,*CL3,*CL4,*CL5,*CL6)

In [6]:
# Droping third group of features from DB
CL7 = ['Event Type_CO','Event Type_SO2','Event Type_NO2','Event Type_O3','Event Type_PM2_5FRM','Event Type_PM2_5NON_FRM','Event Type_PM10MASS','Event Type_PM10SPEC']
CL8 = ['AQI_NO2','AQI_O3','AQI_PM2_5FRM','AQI_PM2_5NON_FRM','AQI_PM10MASS']
df_raw = df_raw.drop(*CL7,*CL8)

In [None]:
# Select only wind measurements that are Knots
#df_raw = df_raw.filter(df_raw['Units of Measure_WIND'] == 'Knots')

In [7]:
# Droping fourth group of features from DB
CL9 = ['Units of Measure_CO','Units of Measure_SO2','Units of Measure_NO2','Units of Measure_O3','Units of Measure_PM2_5FRM']
CL10 = ['Units of Measure_PM2_5NON_FRM','Units of Measure_PM10MASS','Units of Measure_PM10SPEC','Units of Measure_HAPS','Units of Measure_LEAD']
CL11 = ['Units of Measure_NO','Units of Measure_PRESS','Units of Measure_RH','Units of Measure_TEMP','Units of Measure_WIND']

df_raw = df_raw.drop(*CL9,*CL10,*CL11)
df_raw = df_raw.drop(*CL9,*CL10,*CL11)

### Remove Duplicate Data 

In [8]:
# Droping duplicate data from dataset
df_raw = df_raw.distinct()

### Determine Which Columns Have Missing Values 

In [None]:
# Confirm no null values in data set 
df_raw.select(*[(count(c) / count("*")).alias(c) for c in df_raw.columns]).show(vertical=True)
# COUNT(*) is equivalent to COUNT(1) so NULLs won't be an issue
#https://stackoverflow.com/questions/33900726/count-number-of-non-nan-entries-in-each-column-of-spark-dataframe-with-pyspark

### Impute Missing Values - Fill with Median

In [9]:
# Droping incomplete rows from DB
#df = df.na.drop("any")

# we need to impute data values 
# example 

imputeColumnList = ['Arithmetic Mean_HAPS', 'Arithmetic Mean_LEAD', 'Arithmetic Mean_NO', 'Arithmetic Mean_PRESS', 'Arithmetic Mean_RH', 'Arithmetic Mean_TEMP', 'Arithmetic Mean_WIND', 'Arithmetic Mean_PM2_5FRM',  'Arithmetic Mean_PM2_5NON_FRM','Arithmetic Mean_PM10MASS',  'Arithmetic Mean_PM10SPEC', 'pop_density', 'median_income']
stringColumnList = ['Units of Measure_PM2_5FRM','Units of Measure_PM2_5NON_FRM','Units of Measure_PM10MASS','Units of Measure_PM10SPEC']

imputer = Imputer(strategy='median',
    inputCols=imputeColumnList, 
    outputCols=["{}_imputed".format(c) for c in imputeColumnList]
)

# Save imputed output as new data frame 
df = imputer.fit(df_raw).transform(df_raw)

# Remove columns with null that were imputed 
df = df.drop('Arithmetic Mean_HAPS').drop('Arithmetic Mean_LEAD').drop('Arithmetic Mean_NO').drop('Arithmetic Mean_PRESS').drop('Arithmetic Mean_RH').drop('Arithmetic Mean_TEMP')\
       .drop('Arithmetic Mean_WIND').drop('Arithmetic Mean_PM2_5FRM').drop('Arithmetic Mean_PM2_5NON_FRM').drop('Arithmetic Mean_PM10SPEC').drop('Arithmetic Mean_PM10MASS')\
       .drop('pop_density').drop('median_income')

In [None]:
df.columns 

In [None]:
# Confirm no null values in data set 
df.select(*[(count(c) / count("*")).alias(c) for c in df.columns]).show(vertical=True)
# COUNT(*) is equivalent to COUNT(1) so NULLs won't be an issue
#https://stackoverflow.com/questions/33900726/count-number-of-non-nan-entries-in-each-column-of-spark-dataframe-with-pyspark

In [None]:
# Look at a record to confirm no processing errors 
#df.sample(0.000001, seed = 314).show(1, vertical=True)

In [None]:
# Determine the dimensions of the data frame 
print((df.count(), len(df.columns)))

### Remove Rows with Missing AQI and Category Data

In [10]:
df = df.na.drop("any")

In [None]:
# Confirm no null values in data set 
df.select(*[(count(c) / count("*")).alias(c) for c in df.columns]).show(vertical=True)
# COUNT(*) is equivalent to COUNT(1) so NULLs won't be an issue
#https://stackoverflow.com/questions/33900726/count-number-of-non-nan-entries-in-each-column-of-spark-dataframe-with-pyspark

In [None]:
# Determine the dimensions of the data frame 
print((df.count(), len(df.columns)))

### Outlier Detection

In [11]:
CL12 =['Date Local','State Code','County Code','State Name','County Name','Category','fips']
df_out = df.drop(*CL12)

bounds = {
    c: dict(
        zip(["q1", "q3"], df_out.approxQuantile(c, [0.25, 0.75], 0))
    )
    for c in df_out.columns
}

In [12]:
for c in bounds:
    iqr = bounds[c]['q3'] - bounds[c]['q1']
    bounds[c]['lower'] = bounds[c]['q1'] - (iqr * 1.5)
    bounds[c]['upper'] = bounds[c]['q3'] + (iqr * 1.5)

In [13]:
#A = df.select('*').where(*[(df[c]<bounds[c]['lower']) | (df[c]>bounds[c]['upper']) for c in df_out.columns])

df = df.filter(((df['Arithmetic Mean_CO']>bounds['Arithmetic Mean_CO']['lower']) | (df['Arithmetic Mean_CO']<bounds['Arithmetic Mean_CO']['upper'])) &\
          ((df['Arithmetic Mean_SO2']>bounds['Arithmetic Mean_SO2']['lower']) | (df['Arithmetic Mean_SO2']<bounds['Arithmetic Mean_SO2']['upper'])) &\
          ((df['Arithmetic Mean_NO2']>bounds['Arithmetic Mean_NO2']['lower']) | (df['Arithmetic Mean_NO2']<bounds['Arithmetic Mean_NO2']['upper'])) &\
          ((df['Arithmetic Mean_O3']>bounds['Arithmetic Mean_O3']['lower']) | (df['Arithmetic Mean_O3']<bounds['Arithmetic Mean_O3']['upper'])) &\
          ((df['AQI']>bounds['AQI']['lower']) | (df['AQI']<bounds['AQI']['upper'])) &\
          ((df['Arithmetic Mean_PM10SPEC_imputed']>bounds['Arithmetic Mean_PM10SPEC_imputed']['lower']) | (df['Arithmetic Mean_PM10SPEC_imputed']<bounds['Arithmetic Mean_PM10SPEC_imputed']['upper'])) &\
          ((df['Arithmetic Mean_LEAD_imputed']>bounds['Arithmetic Mean_LEAD_imputed']['lower']) | (df['Arithmetic Mean_LEAD_imputed']<bounds['Arithmetic Mean_LEAD_imputed']['upper'])) &\
          ((df['Arithmetic Mean_HAPS_imputed']>bounds['Arithmetic Mean_HAPS_imputed']['lower']) | (df['Arithmetic Mean_HAPS_imputed']<bounds['Arithmetic Mean_HAPS_imputed']['upper'])) &\
          ((df['Arithmetic Mean_NO_imputed']>bounds['Arithmetic Mean_NO_imputed']['lower']) | (df['Arithmetic Mean_NO_imputed']<bounds['Arithmetic Mean_NO_imputed']['upper'])) &\
          ((df['Arithmetic Mean_TEMP_imputed']>bounds['Arithmetic Mean_TEMP_imputed']['lower']) | (df['Arithmetic Mean_TEMP_imputed']<bounds['Arithmetic Mean_TEMP_imputed']['upper'])) &\
          ((df['Arithmetic Mean_PM2_5NON_FRM_imputed']>bounds['Arithmetic Mean_PM2_5NON_FRM_imputed']['lower']) | (df['Arithmetic Mean_PM2_5NON_FRM_imputed']<bounds['Arithmetic Mean_PM2_5NON_FRM_imputed']['upper'])) &\
          ((df['pop_density_imputed']>bounds['pop_density_imputed']['lower']) | (df['pop_density_imputed']<bounds['pop_density_imputed']['upper'])) &\
          ((df['Arithmetic Mean_PM2_5FRM_imputed']>bounds['Arithmetic Mean_PM2_5FRM_imputed']['lower']) | (df['Arithmetic Mean_PM2_5FRM_imputed']<bounds['Arithmetic Mean_PM2_5FRM_imputed']['upper'])) &\
          ((df['Arithmetic Mean_WIND_imputed']>bounds['Arithmetic Mean_WIND_imputed']['lower']) | (df['Arithmetic Mean_WIND_imputed']<bounds['Arithmetic Mean_WIND_imputed']['upper'])) &\
          ((df['Arithmetic Mean_RH_imputed']>bounds['Arithmetic Mean_RH_imputed']['lower']) | (df['Arithmetic Mean_RH_imputed']<bounds['Arithmetic Mean_RH_imputed']['upper'])) &\
          ((df['Arithmetic Mean_PRESS_imputed']>bounds['Arithmetic Mean_PRESS_imputed']['lower']) | (df['Arithmetic Mean_PRESS_imputed']<bounds['Arithmetic Mean_PRESS_imputed']['upper'])) &\
          ((df['Arithmetic Mean_PM10MASS_imputed']>bounds['Arithmetic Mean_PM10MASS_imputed']['lower']) | (df['Arithmetic Mean_PM10MASS_imputed']<bounds['Arithmetic Mean_PM10MASS_imputed']['upper'])) &\
          ((df['median_income_imputed']>bounds['median_income_imputed']['lower']) | (df['median_income_imputed']<bounds['median_income_imputed']['upper'])))



In [14]:
import pyspark.sql.functions as f
df.select(
    "*",
    *[
        f.when(
            f.col(c).between(bounds[c]['lower'], bounds[c]['upper']),
            0
        ).otherwise(1).alias(c+"_out") 
        for c in df_out.columns
    ]
).show(1,vertical=True)

-RECORD 0------------------------------------------------------
 Date Local                               | 2018-03-21         
 State Code                               | 29                 
 County Code                              | 510                
 State Name                               | Missouri           
 County Name                              | St. Louis City     
 Arithmetic Mean_CO                       | 0.247625           
 Arithmetic Mean_SO2                      | 0.036364           
 Arithmetic Mean_NO2                      | 10.245833          
 Arithmetic Mean_O3                       | 0.029824           
 AQI                                      | 43.0               
 Category                                 | Good               
 fips                                     | 29510              
 Arithmetic Mean_PM10SPEC_imputed         | 1.0                
 Arithmetic Mean_LEAD_imputed             | 0.0032             
 Arithmetic Mean_HAPS_imputed           

In [None]:
#save to csv file  ####CHECKPOINT DATASET#####
df.repartition(1).write.format('com.databricks.spark.csv').save('df_cleaned.csv',header = 'true')