<a href="https://colab.research.google.com/github/JavOrraca/EPA-Air-Quality-Analysis/blob/master/EPA_Air_Quality_Analysis.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# BANA295 - Big Data Mgmt
# EPA Air Quality Analysis
Final Team Project
* Yiting Fu
* Javier Orraca
* Mark Planas

Install Java and Spark packages and dependencies for Google Colab Python Environment
 * If run in the future, look for correct Spark / Hadoop tgz and tarfile module
 * Source: https://towardsdatascience.com/pyspark-in-google-colab-6821c2faf41c

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-2.4.3/spark-2.4.3-bin-hadoop2.7.tgz
!tar xf spark-2.4.3-bin-hadoop2.7.tgz
!pip install -q findspark

# If wanting to run bq_helper code, use the following:
# !pip install -e git+https://github.com/SohierDane/BigQuery_Helper#egg=bq_helper

# Install libraries so that statsmodel package in Colab will not
# give logsumexp or factorial import errors (statsmodel wheel build takes a WHILE)
!pip install --upgrade Cython
!pip install --upgrade git+https://github.com/statsmodels/statsmodels

Requirement already up-to-date: Cython in /usr/local/lib/python3.6/dist-packages (0.29.10)
Collecting git+https://github.com/statsmodels/statsmodels
  Cloning https://github.com/statsmodels/statsmodels to /tmp/pip-req-build-46p78rb0
  Running command git clone -q https://github.com/statsmodels/statsmodels /tmp/pip-req-build-46p78rb0
Building wheels for collected packages: statsmodels
  Building wheel for statsmodels (setup.py) ... [?25l[?25hdone
  Stored in directory: /tmp/pip-ephem-wheel-cache-ivrgby0_/wheels/7d/ad/45/ac1a03bd759c2fa74c486e2b1950d94b55f511b4c2b0418bd5
Successfully built statsmodels
Installing collected packages: statsmodels
  Found existing installation: statsmodels 0.9.0
    Uninstalling statsmodels-0.9.0:
      Successfully uninstalled statsmodels-0.9.0
Successfully installed statsmodels-0.10.0rc2


Set the environment path that enables us to run PySpark in our Colab environment running the following code:

In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.3-bin-hadoop2.7"

In [0]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [0]:
#gcloud dataproc clusters list

Import packages for data analysis and visualizations including NumPy, Pandas, Matplotlib, etc.

In [0]:
import numpy as np
import pandas as pd
import sys
# import matplotlib.pyplot as plot
import seaborn as sns
import io
import pprint
import plotly
import plotly.plotly as py
import plotly.tools as tls
import plotly.graph_objs as go
from pandas.io import gbq
from plotly.tools import FigureFactory as FF
from plotly.offline import init_notebook_mode, iplot
from plotly.plotly import plot_mpl
from statsmodels.tsa.seasonal import seasonal_decompose
import cufflinks as cf
init_notebook_mode(connected=True) # Use for offline maps

Provide your credentials to the runtime via Google Cloud SDK:

In [0]:
from google.colab import auth
auth.authenticate_user()
print('Authenticated')

Authenticated


In [0]:
from google.cloud import bigquery
import google.cloud.bigquery as bq

# Download the file into Google Colab tmp folder without affecting RAM limitation of Colab Python environment
!gsutil cp gs://bana295_epa_analysis/co_daily_summary-000000000000.csv /tmp/co_0.csv
!gsutil cp gs://bana295_epa_analysis/co_daily_summary-000000000001.csv /tmp/co_1.csv
!gsutil cp gs://bana295_epa_analysis/co_daily_summary-000000000002.csv /tmp/co_2.csv
!gsutil cp gs://bana295_epa_analysis/no2_daily_summary-000000000000.csv /tmp/no2_0.csv
!gsutil cp gs://bana295_epa_analysis/no2_daily_summary-000000000001.csv /tmp/no2_1.csv
!gsutil cp gs://bana295_epa_analysis/no2_daily_summary-000000000002.csv /tmp/no2_2.csv
!gsutil cp gs://bana295_epa_analysis/o3_daily_summary-000000000000.csv /tmp/o3_0.csv
!gsutil cp gs://bana295_epa_analysis/o3_daily_summary-000000000001.csv /tmp/o3_1.csv
!gsutil cp gs://bana295_epa_analysis/o3_daily_summary-000000000002.csv /tmp/o3_2.csv
!gsutil cp gs://bana295_epa_analysis/o3_daily_summary-000000000003.csv /tmp/o3_3.csv
!gsutil cp gs://bana295_epa_analysis/o3_daily_summary-000000000004.csv /tmp/o3_4.csv
!gsutil cp gs://bana295_epa_analysis/o3_daily_summary-000000000005.csv /tmp/o3_5.csv
!gsutil cp gs://bana295_epa_analysis/temperature_daily_summary-000000000000.csv /tmp/temp_0.csv
!gsutil cp gs://bana295_epa_analysis/temperature_daily_summary-000000000001.csv /tmp/temp_1.csv
!gsutil cp gs://bana295_epa_analysis/temperature_daily_summary-000000000002.csv /tmp/temp_2.csv
!gsutil cp gs://bana295_epa_analysis/temperature_daily_summary-000000000003.csv /tmp/temp_3.csv
!gsutil cp gs://bana295_epa_analysis/temperature_daily_summary-000000000004.csv /tmp/temp_4.csv
  
# Print results to ensure transfer worked, if desired.
#!cat /tmp/co_0.csv

Copying gs://bana295_epa_analysis/co_daily_summary-000000000000.csv...
/ [1 files][749.9 MiB/749.9 MiB]                                                
Operation completed over 1 objects/749.9 MiB.                                    
Copying gs://bana295_epa_analysis/co_daily_summary-000000000001.csv...
| [1 files][750.2 MiB/750.2 MiB]                                                
Operation completed over 1 objects/750.2 MiB.                                    
Copying gs://bana295_epa_analysis/co_daily_summary-000000000002.csv...
/ [1 files][750.0 MiB/750.0 MiB]                                                
Operation completed over 1 objects/750.0 MiB.                                    
Copying gs://bana295_epa_analysis/no2_daily_summary-000000000000.csv...
\ [1 files][379.4 MiB/379.4 MiB]                                                
Operation completed over 1 objects/379.4 MiB.                                    
Copying gs://bana295_epa_analysis/no2_daily_summary-00000000000

Read all files transferred from Google Cloud Storage buckets as Python dataframes and further bind the dataframes by rows

In [0]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression

co_0 = spark.read.csv('/tmp/co_0.csv',inferSchema=True, header =True)
co_1 = spark.read.csv('/tmp/co_1.csv',inferSchema=True, header =True)
co_2 = spark.read.csv('/tmp/co_2.csv',inferSchema=True, header =True)
no2_0 = spark.read.csv('/tmp/no2_0.csv',inferSchema=True, header =True)
no2_1 = spark.read.csv('/tmp/no2_1.csv',inferSchema=True, header =True)
no2_2 = spark.read.csv('/tmp/no2_2.csv',inferSchema=True, header =True)
o3_0 = spark.read.csv('/tmp/o3_0.csv',inferSchema=True, header =True)
o3_1 = spark.read.csv('/tmp/o3_1.csv',inferSchema=True, header =True)
o3_2 = spark.read.csv('/tmp/o3_2.csv',inferSchema=True, header =True)
o3_3 = spark.read.csv('/tmp/o3_3.csv',inferSchema=True, header =True)
o3_4 = spark.read.csv('/tmp/o3_4.csv',inferSchema=True, header =True)
o3_5 = spark.read.csv('/tmp/o3_5.csv',inferSchema=True, header =True)
temp_0 = spark.read.csv('/tmp/temp_0.csv',inferSchema=True, header =True)
temp_1 = spark.read.csv('/tmp/temp_1.csv',inferSchema=True, header =True)
temp_2 = spark.read.csv('/tmp/temp_2.csv',inferSchema=True, header =True)
temp_3 = spark.read.csv('/tmp/temp_3.csv',inferSchema=True, header =True)
temp_4 = spark.read.csv('/tmp/temp_4.csv',inferSchema=True, header =True)

In [0]:
# Use the following code if you ever want to read your csv files as Pandas dataframes

#co_0 = pd.read_csv('/tmp/co_0.csv')
#co_1 = pd.read_csv('/tmp/co_1.csv')
#co_2 = pd.read_csv('/tmp/co_2.csv')
#no2_0 = pd.read_csv('/tmp/no2_0.csv')
#no2_1 = pd.read_csv('/tmp/no2_1.csv')
#no2_2 = pd.read_csv('/tmp/no2_2.csv')
#o3_0 = pd.read_csv('/tmp/o3_0.csv')
#o3_1 = pd.read_csv('/tmp/o3_1.csv')
#o3_2 = pd.read_csv('/tmp/o3_2.csv')
#o3_3 = pd.read_csv('/tmp/o3_3.csv')
#o3_4 = pd.read_csv('/tmp/o3_4.csv')
#o3_5 = pd.read_csv('/tmp/o3_5.csv')
#temp_0 = pd.read_csv('/tmp/temp_0.csv')
#temp_1 = pd.read_csv('/tmp/temp_1.csv')
#temp_2 = pd.read_csv('/tmp/temp_2.csv')
#temp_3 = pd.read_csv('/tmp/temp_3.csv')
#temp_4 = pd.read_csv('/tmp/temp_4.csv')

In [0]:
# Concatenate Pandas dataframes

#co = pd.concat([co_0, co_1, co_2])
#no2 = pd.concat([no2_0, no2_1, no2_2])
#o3 = pd.concat([o3_0, o3_1, o3_2, o3_3, o3_4, o3_5])
#temp = pd.concat([temp_0, temp_1, temp_2, temp_3, temp_4])

In [0]:
# Use Python and Spark to reduce and join the dataframes 
from functools import reduce
from pyspark.sql import DataFrame
from pyspark.sql.functions import date_format, lit, col, array, create_map, struct

def unionAll(*dfs):
    return reduce(DataFrame.unionAll, dfs)

co = unionAll(co_0, co_1, co_2)
no2 = unionAll(no2_0, no2_1, no2_2)
o3 = unionAll(o3_0, o3_1, o3_2, o3_3, o3_4, o3_5)
temp = unionAll(temp_0, temp_1, temp_2, temp_3, temp_4)

In [0]:
# Test descriptive stats for Spark object:
co.filter("state_name = 'California' AND county_name = 'Orange'").describe().show()

+-------+----------+--------------------+------------------+--------------+-----+-------------------+-------------------+-----+---------------+--------------------+------------------+-----------------+----------+------------------+-------------------+------------------+------------------+-----------------+------------------+-----------------+--------------------+---------------+--------------------+----------+-----------+-------------+--------------------+
|summary|state_code|         county_code|          site_num|parameter_code|  poc|           latitude|          longitude|datum| parameter_name|     sample_duration|pollutant_standard| units_of_measure|event_type| observation_count|observation_percent|   arithmetic_mean|   first_max_value|   first_max_hour|               aqi|      method_code|         method_name|local_site_name|             address|state_name|county_name|    city_name|           cbsa_name|
+-------+----------+--------------------+------------------+--------------+---

In [0]:
# Create filter for Orange County on individual files
OC_co = co.filter("state_name = 'California' AND county_name = 'Orange'")
OC_no2 = no2.filter("state_name = 'California' AND county_name = 'Orange'")
OC_o3 = o3.filter("state_name = 'California' AND county_name = 'Orange'")
OC_temp = temp.filter("state_name = 'California' AND county_name = 'Orange'")

In [0]:
# See available fields in the co data
OC_co.show(5)

+----------+-----------+--------+--------------+---+--------+----------+-----+---------------+--------------------+------------------+-------------------+-----------------+----------+-----------------+-------------------+---------------+---------------+--------------+----+-----------+--------------------+---------------+--------------------+----------+-----------+-------------+--------------------+-------------------+
|state_code|county_code|site_num|parameter_code|poc|latitude| longitude|datum| parameter_name|     sample_duration|pollutant_standard|         date_local| units_of_measure|event_type|observation_count|observation_percent|arithmetic_mean|first_max_value|first_max_hour| aqi|method_code|         method_name|local_site_name|             address|state_name|county_name|    city_name|           cbsa_name|date_of_last_change|
+----------+-----------+--------+--------------+---+--------+----------+-----+---------------+--------------------+------------------+-------------------+--

In [0]:
OC_no2.show(5)

+----------+-----------+--------+--------------+---+---------+----------+-----+--------------------+---------------+------------------+-------------------+-----------------+----------+-----------------+-------------------+------------------+---------------+--------------+---+-----------+--------------------+--------------------+--------------------+----------+-----------+---------+--------------------+-------------------+
|state_code|county_code|site_num|parameter_code|poc| latitude| longitude|datum|      parameter_name|sample_duration|pollutant_standard|         date_local| units_of_measure|event_type|observation_count|observation_percent|   arithmetic_mean|first_max_value|first_max_hour|aqi|method_code|         method_name|     local_site_name|             address|state_name|county_name|city_name|           cbsa_name|date_of_last_change|
+----------+-----------+--------+--------------+---+---------+----------+-----+--------------------+---------------+------------------+-------------

In [0]:
OC_o3.show(5)

+----------+-----------+--------+--------------+---+------------------+----------+-----+--------------+--------------------+------------------+-------------------+-----------------+----------+-----------------+-------------------+--------------------+--------------------+--------------+---+-----------+-----------+--------------------+--------------------+----------+-----------+----------+--------------------+-------------------+
|state_code|county_code|site_num|parameter_code|poc|          latitude| longitude|datum|parameter_name|     sample_duration|pollutant_standard|         date_local| units_of_measure|event_type|observation_count|observation_percent|     arithmetic_mean|     first_max_value|first_max_hour|aqi|method_code|method_name|     local_site_name|             address|state_name|county_name| city_name|           cbsa_name|date_of_last_change|
+----------+-----------+--------+--------------+---+------------------+----------+-----+--------------+--------------------+----------

In [0]:
OC_temp.show(5)

+----------+-----------+--------+--------------+---+--------+----------+-----+-------------------+---------------+------------------+-------------------+------------------+----------+-----------------+-------------------+------------------+---------------+--------------+----+-----------+--------------------+---------------+--------------------+----------+-----------+-------------+--------------------+-------------------+
|state_code|county_code|site_num|parameter_code|poc|latitude| longitude|datum|     parameter_name|sample_duration|pollutant_standard|         date_local|  units_of_measure|event_type|observation_count|observation_percent|   arithmetic_mean|first_max_value|first_max_hour| aqi|method_code|         method_name|local_site_name|             address|state_name|county_name|    city_name|           cbsa_name|date_of_last_change|
+----------+-----------+--------+--------------+---+--------+----------+-----+-------------------+---------------+------------------+-----------------

Manipulate Spark data frame to Group By state, county, and date, while calculating average AQI and Arithmetic Mean (for )each pollutant). Create a new Date column with proper formatting for modeling in yyyy-MM. Then, create additional column to identify specific pollutant in each df and further  join the Spark dataframes together (saved below as Spark_Grouped).

Lastly, run select queries to only include only AQI and Date fields for Orange County to further evaluation via Spark using ARIMA, visualizing via Plotly, and exploring the seasonality in the data. We will regress in later steps.

In [0]:
# Create Spark dataframes for modeling purposes
co_grouped = co.groupBy('state_name', 'county_name', 'date_local').avg('aqi', 'arithmetic_mean')
co_grouped = co_grouped.withColumn("Date", date_format('date_local', "yyyy-MM"))
co_grouped = co_grouped.withColumn("Pollutant",lit("co"))

no2_grouped = no2.groupBy('state_name', 'county_name', 'date_local').avg('aqi', 'arithmetic_mean')
no2_grouped = no2_grouped.withColumn("Date", date_format('date_local', "yyyy-MM"))
no2_grouped = no2_grouped.withColumn("Pollutant",lit("no2"))

o3_grouped = o3.groupBy('state_name', 'county_name', 'date_local').avg('aqi', 'arithmetic_mean')
o3_grouped = o3_grouped.withColumn("Date", date_format('date_local', "yyyy-MM"))
o3_grouped = o3_grouped.withColumn("Pollutant",lit("o3"))

temp_grouped = temp.groupBy('state_name', 'county_name', 'date_local').avg('arithmetic_mean')
temp_grouped = temp_grouped.withColumn("Date", date_format('date_local', "yyyy-MM"))
temp_grouped = temp_grouped.withColumn("Measurement",lit("Fahrenheit"))

# Join the CO2, NO2, and O3 data sets
Spark_Grouped = unionAll(co_grouped, no2_grouped, o3_grouped)

In [0]:
Spark_Grouped.show(20)

+------------+-----------+-------------------+------------------+--------------------+-------+---------+
|  state_name|county_name|         date_local|          avg(aqi)|avg(arithmetic_mean)|   Date|Pollutant|
+------------+-----------+-------------------+------------------+--------------------+-------+---------+
|    Colorado|   La Plata|2005-05-30 00:00:00|               3.0|             0.25625|2005-05|       co|
|Pennsylvania|    Dauphin|1999-08-08 00:00:00|               5.0| 0.35208300000000003|1999-08|       co|
|   Tennessee|     Loudon|1996-10-20 00:00:00|               3.0|            0.222917|1996-10|       co|
|    Illinois|     Peoria|2003-01-20 00:00:00|              14.0|           0.6208335|2003-01|       co|
|        Utah|     Uintah|2012-12-30 00:00:00|               3.0|              0.2375|2012-12|       co|
|   Minnesota|    Carlton|1990-04-08 00:00:00|               0.0|                 0.0|1990-04|       co|
|Pennsylvania|      Berks|1997-02-07 00:00:00|         

In [0]:
# Let's explore via visualizations (and later regress) using only the AQI field by Date. Note: As AQI is not pollutant-specific, the CO, NO2, and O3 dataframes contain the same exact AQI values on the same dates. As such, below we'll just work off the CO dataframe.
Spark_AQI_OC_co = co_grouped.filter("state_name = 'California' AND county_name = 'Orange'").select('Date', col('avg(arithmetic_mean)').alias('Avg_CO'))
Spark_AQI_OC_no2 = no2_grouped.filter("state_name = 'California' AND county_name = 'Orange'").select('Date', col('avg(arithmetic_mean)').alias('Avg_NO2'))
Spark_AQI_OC_o3 = o3_grouped.filter("state_name = 'California' AND county_name = 'Orange'").select('Date', col('avg(arithmetic_mean)').alias('Avg_O3'))

Spark_Temperature = temp_grouped.filter("state_name = 'California' AND county_name = 'Orange'").select('Date', col('avg(arithmetic_mean)').alias('Temperature_F'))

#Spark_AQI_OC_co.show(50)

In [0]:
Spark_AQI_OC_co.show(100)

+-------+-------------------+
|   Date|             Avg_CO|
+-------+-------------------+
|2013-06|        0.155218375|
|2017-09|0.28120287499999996|
|2011-05|        0.291271375|
|2014-02|0.15772925000000002|
|2004-08|         0.16333775|
|2002-02|         1.66062025|
|2011-03|        0.322115875|
|2006-05|        0.239424875|
|2005-08|        0.139311625|
|2014-12|          0.6632575|
|2005-09|          0.1334465|
|2016-12|          0.3691668|
|2005-07|0.22475074999999997|
|2015-07|0.26007759999999996|
|2006-02| 0.9016531249999999|
|2015-12|0.43585399999999996|
|2002-08|        0.336956375|
|2003-04|        0.789990875|
|2002-06|        0.214198375|
|1997-09| 0.7110733749999999|
|1997-10|        0.475951125|
|1993-12|          1.5249095|
|1995-12|        2.062771875|
|1992-03| 0.7436821250000001|
|1991-08|        0.431385875|
|1990-06|        0.580594375|
|1996-01| 1.6423008749999997|
|1991-03|0.46822925000000004|
|1992-10|        1.845357875|
|1996-01|           1.214923|
|1998-03| 

In [0]:
# Convert Spark df to Pandas df for Plotly plotting / ML, then make Date field the index and remove Date proper field
AQI_OC_co = Spark_AQI_OC_co.toPandas()
AQI_OC_no2 = Spark_AQI_OC_no2.toPandas()
AQI_OC_o3 = Spark_AQI_OC_o3.toPandas()
Temperature = Spark_Temperature.toPandas()

AQI_OC_co = AQI_OC_co.set_index('Date')
AQI_OC_no2 = AQI_OC_no2.set_index('Date')
AQI_OC_o3 = AQI_OC_o3.set_index('Date')
Temperature = Temperature.set_index('Date')

AQI_OC_co.index = pd.to_datetime(AQI_OC_co.index)
AQI_OC_no2.index = pd.to_datetime(AQI_OC_no2.index)
AQI_OC_o3.index = pd.to_datetime(AQI_OC_o3.index)
Temperature.index = pd.to_datetime(Temperature.index)

AQI_OC_co = AQI_OC_co.sort_index()
AQI_OC_no2 = AQI_OC_no2.sort_index()
AQI_OC_o3 = AQI_OC_o3.sort_index()
Temperature = Temperature.sort_index()

# Check that Date field if no longer available as df column
#AQI_OC_co.head(10)

In [0]:
AQI_OC_co.head(10)

Unnamed: 0_level_0,Avg_CO
Date,Unnamed: 1_level_1
1990-01-01,1.899557
1990-01-01,2.890747
1990-01-01,0.847826
1990-01-01,5.33759
1990-01-01,2.832548
1990-01-01,4.3688
1990-01-01,1.945732
1990-01-01,2.063285
1990-01-01,1.81413
1990-01-01,2.268413


In [0]:
# Provide credentials:
plotly.tools.set_credentials_file(username='JavierPlotly', api_key='[EnterAPIKeyHere]')

# Plot time-series of CO levels since 1990
AQI_OC_co.iplot(title="Orange County Average CO Levels (monthly) 1990 to 2018")

In [0]:
AQI_OC_no2.iplot(title="Orange County Average NO2 Levels (monthly) 1990 to 2018")

In [0]:
AQI_OC_o3.iplot(title="Orange County Average O3 Levels (monthly) 1990 to 2018")

In [0]:
Temperature.iplot(title="Orange County Average Temperature (Fahrenheit, monthly) 1990 to 2018")