# Analyze air quality summary statistics by year at the US county level
**Team members:**
<br>[Javier Orraca](https://javorraca.github.io/Home/)
<br>[Yiting Fu](https://github.com/Yiting2018)
<br>[Mark Planas](https://github.com/markplanas)

Research Questions:
  * How US air quality changes through time at the county level;
  * What factors could cause these changes (potentially researching external data sets for understanding "why" US air quality got better or worse, etc.)

In [168]:
# Import all the useful packages, note that pyspark needs to be install locally first.
import numpy as np
import pandas as pd
import sys
import matplotlib.pyplot as plt
import seaborn as sns
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression

In [None]:
from google.cloud import bigquery
import google.cloud.bigquery as bq

# Download the file into Google Colab tmp folder without affecting RAM limitation of Colab Python environment
!gsutil cp gs://bana295_epa_analysis/co_daily_summary-000000000000.csv /tmp/co_0.csv
!gsutil cp gs://bana295_epa_analysis/co_daily_summary-000000000001.csv /tmp/co_1.csv
!gsutil cp gs://bana295_epa_analysis/co_daily_summary-000000000002.csv /tmp/co_2.csv
!gsutil cp gs://bana295_epa_analysis/no2_daily_summary-000000000000.csv /tmp/no2_0.csv
!gsutil cp gs://bana295_epa_analysis/no2_daily_summary-000000000001.csv /tmp/no2_1.csv
!gsutil cp gs://bana295_epa_analysis/no2_daily_summary-000000000002.csv /tmp/no2_2.csv
!gsutil cp gs://bana295_epa_analysis/o3_daily_summary-000000000000.csv /tmp/o3_0.csv
!gsutil cp gs://bana295_epa_analysis/o3_daily_summary-000000000001.csv /tmp/o3_1.csv
!gsutil cp gs://bana295_epa_analysis/o3_daily_summary-000000000002.csv /tmp/o3_2.csv
!gsutil cp gs://bana295_epa_analysis/o3_daily_summary-000000000003.csv /tmp/o3_3.csv
!gsutil cp gs://bana295_epa_analysis/o3_daily_summary-000000000004.csv /tmp/o3_4.csv
!gsutil cp gs://bana295_epa_analysis/o3_daily_summary-000000000005.csv /tmp/o3_5.csv
!gsutil cp gs://bana295_epa_analysis/temperature_daily_summary-000000000000.csv /tmp/temp_0.csv
!gsutil cp gs://bana295_epa_analysis/temperature_daily_summary-000000000001.csv /tmp/temp_1.csv
!gsutil cp gs://bana295_epa_analysis/temperature_daily_summary-000000000002.csv /tmp/temp_2.csv
!gsutil cp gs://bana295_epa_analysis/temperature_daily_summary-000000000003.csv /tmp/temp_3.csv
!gsutil cp gs://bana295_epa_analysis/temperature_daily_summary-000000000004.csv /tmp/temp_4.csv

### After gathering data from Google Cloud Storage buckets, we input them using spark

In [169]:
co_0 = spark.read.csv('co_daily_summary-000000000000.csv', inferSchema = True, header = True)
co_1 = spark.read.csv('co_daily_summary-000000000001.csv', inferSchema = True, header = True)
co_2 = spark.read.csv('co_daily_summary-000000000002.csv', inferSchema = True, header = True)
no2_0 = spark.read.csv('no2_daily_summary-000000000000.csv', inferSchema = True, header = True)
no2_1 = spark.read.csv('no2_daily_summary-000000000001.csv', inferSchema = True, header = True)
no2_2 = spark.read.csv('no2_daily_summary-000000000002.csv', inferSchema = True, header = True)
o3_0 = spark.read.csv('o3_daily_summary-000000000000.csv', inferSchema = True, header = True)
o3_1 = spark.read.csv('o3_daily_summary-000000000001.csv', inferSchema = True, header = True)
o3_2 = spark.read.csv('o3_daily_summary-000000000002.csv', inferSchema = True, header = True)
o3_3 = spark.read.csv('o3_daily_summary-000000000003.csv', inferSchema = True, header = True)
o3_4 = spark.read.csv('o3_daily_summary-000000000004.csv', inferSchema = True, header = True)
o3_5 = spark.read.csv('o3_daily_summary-000000000005.csv', inferSchema = True, header = True)
o3_6 = spark.read.csv('o3_daily_summary-000000000006.csv', inferSchema = True, header = True)
temp_0 = spark.read.csv('temperature_daily_summary-000000000000.csv', inferSchema = True, header = True)
temp_1 = spark.read.csv('temperature_daily_summary-000000000001.csv', inferSchema = True, header = True)
temp_2 = spark.read.csv('temperature_daily_summary-000000000002.csv', inferSchema = True, header = True)
temp_3 = spark.read.csv('temperature_daily_summary-000000000003.csv', inferSchema = True, header = True)
temp_4 = spark.read.csv('temperature_daily_summary-000000000004.csv', inferSchema = True, header = True)

In [170]:
from functools import reduce
from pyspark.sql import DataFrame

In [171]:
# Create a function to union all the sub-datasets.
def unionAll(*dfs):
    return reduce(DataFrame.unionAll, dfs)

In [172]:
co = unionAll(co_0, co_1, co_2)
no2 = unionAll(no2_0, no2_1, no2_2)
o3 = unionAll(o3_0, o3_1, o3_2, o3_3, o3_4, o3_5, o3_6)
temp = unionAll(temp_0, temp_1, temp_2, temp_3, temp_4)

In [173]:
co.show() # See the details of the dataset "co"

+----------+-----------+--------+--------------+---+---------+-----------+-----+---------------+--------------------+------------------+-------------------+-----------------+----------+-----------------+-------------------+-------------------+---------------+--------------+----+-----------+--------------------+--------------------+--------------------+-------------+------------+---------------+--------------------+-------------------+
|state_code|county_code|site_num|parameter_code|poc| latitude|  longitude|datum| parameter_name|     sample_duration|pollutant_standard|         date_local| units_of_measure|event_type|observation_count|observation_percent|    arithmetic_mean|first_max_value|first_max_hour| aqi|method_code|         method_name|     local_site_name|             address|   state_name| county_name|      city_name|           cbsa_name|date_of_last_change|
+----------+-----------+--------+--------------+---+---------+-----------+-----+---------------+--------------------+-----

In [174]:
from pyspark.sql.functions import year, month, mean
from pyspark.sql.functions import col
from pyspark.sql.functions import date_format

In [175]:
co = co.withColumn("date", date_format('date_local', "yyyy-MM"))
no2 = no2.withColumn("date", date_format('date_local', "yyyy-MM"))
o3 = o3.withColumn("date", date_format('date_local', "yyyy-MM"))
temp = temp.withColumn("date", date_format('date_local', "yyyy-MM"))

#### Get the average numbers of each months for each counties.

In [176]:
co_1 = co.groupBy('state_name', 'county_name', 'date').avg('aqi', 'arithmetic_mean').select('state_name', 'county_name', col("avg(aqi)").alias("Avg_AQI"), col("avg(arithmetic_mean)").alias("Avg_Value"), 'date')

In [198]:
co_1.sort(col('Avg_AQI').desc()).show()

+-----------------+--------------------+-----------------+------------------+-------+
|       state_name|         county_name|          Avg_AQI|         Avg_Value|   date|
+-----------------+--------------------+-----------------+------------------+-------+
|Country Of Mexico|BAJA CALIFORNIA N...| 98.1556886227545| 3.555596183183184|1999-11|
|       California|            Imperial|89.29032258064517|3.1247022419354837|1995-12|
|Country Of Mexico|BAJA CALIFORNIA N...|82.22489959839358|3.2678619016064268|2000-12|
|Country Of Mexico|BAJA CALIFORNIA N...|80.85507246376811| 3.175846101449275|2002-02|
|Country Of Mexico|BAJA CALIFORNIA N...| 80.1673640167364|3.1400600020920497|1998-01|
|Country Of Mexico|     CHIHUAHUA STATE|77.33333333333333|2.7795186916666665|1998-11|
|       California|            Imperial|             76.9| 2.721872316666667|1995-11|
|Country Of Mexico|BAJA CALIFORNIA N...|76.76129032258065| 3.054804090322581|1999-12|
|Country Of Mexico|BAJA CALIFORNIA N...| 76.0460251046

In [178]:
no2_1 = no2.groupBy('state_name', 'county_name', 'date').avg('aqi', 'arithmetic_mean').select('state_name', 'county_name', col("avg(aqi)").alias("Avg_AQI"), col("avg(arithmetic_mean)").alias("Avg_Value"), 'date')

In [199]:
no2_1.sort(col('Avg_AQI').desc()).show()

+------------+-----------+-----------------+------------------+-------+
|  state_name|county_name|          Avg_AQI|         Avg_Value|   date|
+------------+-----------+-----------------+------------------+-------+
|        Utah|      Davis|            108.0|        89.1674242|1992-01|
|    Colorado|     Denver|95.51612903225806| 70.33737096774195|1992-12|
|    Colorado|   Arapahoe|             93.0|              56.0|1991-01|
|    Colorado|     Denver|             91.0| 64.61837566666667|1995-02|
|      Kansas|  Wyandotte|87.19354838709677| 29.48241835483871|1996-01|
|      Kansas|  Wyandotte|84.64516129032258|26.137463161290324|1995-12|
|  California|Los Angeles|83.82629107981221| 56.62685396948358|1995-11|
|Pennsylvania|   Lawrence|82.93103448275862|50.652457827586204|1996-01|
|      Kansas|  Wyandotte| 82.2258064516129|25.061094838709675|1995-10|
|  California|Los Angeles|            81.85| 53.35674523809524|1991-02|
|      Kansas|  Wyandotte|81.53333333333333|        24.1362812|1

In [180]:
no2_1.describe()

DataFrame[summary: string, state_name: string, county_name: string, Avg_AQI: string, Avg_Value: string, date: string]

In [181]:
o3_1 = o3.groupBy('state_name', 'county_name', 'date').avg('aqi', 'arithmetic_mean').select('state_name', 'county_name', col("avg(aqi)").alias("Avg_AQI"), col("avg(arithmetic_mean)").alias("Avg_Value"), 'date')

In [201]:
o3_1.sort(col('Avg_AQI').desc()).show(50)

+----------+--------------+------------------+--------------------+-------+
|state_name|   county_name|           Avg_AQI|           Avg_Value|   date|
+----------+--------------+------------------+--------------------+-------+
|California|San Bernardino|168.08709677419355| 0.06535911935483871|1990-07|
|California|San Bernardino| 167.4891304347826| 0.06951167663043477|1994-07|
|   Georgia|        Fulton|166.48387096774192| 0.06242812903225806|1999-08|
|California|San Bernardino|165.83136094674558| 0.06908530088495575|1991-07|
|California|San Bernardino|163.89036544850498| 0.06767630564784052|1991-08|
|California|San Bernardino|162.30100334448161| 0.06409439130434783|1993-08|
|California|         Kings|161.51612903225808| 0.06733396774193548|1996-07|
|California|         Kings| 161.2258064516129| 0.06518741935483871|1996-08|
|California|San Bernardino|159.39506172839506| 0.06709643209876544|1994-06|
|     Texas|         Ellis|             159.0| 0.07200000000000001|1997-08|
|   Georgia|

In [183]:
temp.dtypes # We noticed the datatype for aqi in temp is string

[('state_code', 'int'),
 ('county_code', 'int'),
 ('site_num', 'int'),
 ('parameter_code', 'int'),
 ('poc', 'int'),
 ('latitude', 'double'),
 ('longitude', 'double'),
 ('datum', 'string'),
 ('parameter_name', 'string'),
 ('sample_duration', 'string'),
 ('pollutant_standard', 'string'),
 ('date_local', 'timestamp'),
 ('units_of_measure', 'string'),
 ('event_type', 'string'),
 ('observation_count', 'int'),
 ('observation_percent', 'int'),
 ('arithmetic_mean', 'double'),
 ('first_max_value', 'double'),
 ('first_max_hour', 'int'),
 ('aqi', 'string'),
 ('method_code', 'int'),
 ('method_name', 'string'),
 ('local_site_name', 'string'),
 ('address', 'string'),
 ('state_name', 'string'),
 ('county_name', 'string'),
 ('city_name', 'string'),
 ('cbsa_name', 'string'),
 ('date_of_last_change', 'timestamp'),
 ('date', 'string')]

In [184]:
temp = temp.withColumn("aqi", temp["aqi"].cast("double"))

In [185]:
temp_1 = temp.groupBy('state_name', 'county_name', 'date').avg('aqi', 'arithmetic_mean').select('state_name', 'county_name', col("avg(aqi)").alias("Avg_AQI"), col("avg(arithmetic_mean)").alias("Avg_Value"), 'date')

In [202]:
temp_1.sort(col('Avg_AQI').desc()).show()

+--------------+---------------+-------+------------------+-------+
|    state_name|    county_name|Avg_AQI|         Avg_Value|   date|
+--------------+---------------+-------+------------------+-------+
|      Maryland|Prince George's|   null| 69.23082007407407|2004-05|
|         Texas|           Bell|   null| 64.58736564516128|2009-10|
|  North Dakota|       Williams|   null| 57.62150668888889|2016-09|
|     Tennessee|         Shelby|   null| 79.65236116666667|2016-06|
|         Texas|        Hidalgo|   null| 86.83086488709675|2016-08|
|         Texas|      Robertson|   null| 53.44758058064515|2016-12|
|  North Dakota|       Williams|   null| 27.26819924137931|2016-02|
|   Connecticut|      New Haven|   null|46.990727763440866|2012-03|
|      Oklahoma|       Sequoyah|   null| 68.06182809677419|2016-10|
|  Pennsylvania|   Philadelphia|   null| 69.15833323333332|2009-09|
|         Idaho|      Nez Perce|   null| 53.39913195833333|2012-04|
|         Texas|        Kleberg|   null| 79.6387

In [187]:
co.groupBy(year('date_local')).avg('aqi', 'arithmetic_mean').select(col("year(date_local)").alias("Year"), col("avg(arithmetic_mean)").alias("CO"), col("avg(aqi)").alias("AQI")).sort('Year').show(28)

+----+-------------------+------------------+
|Year|                 CO|               AQI|
+----+-------------------+------------------+
|1990| 1.1257721687540605| 20.38121590164033|
|1991| 1.0903920747257416|19.506955607034797|
|1992| 1.0394011228738278|18.588074662591165|
|1993| 0.9824400527254451|17.656195163942566|
|1994| 0.9796976413680244| 17.48246121658617|
|1995| 0.9192320024887064| 16.24126289193104|
|1996| 0.8680204554714104| 15.36029527762516|
|1997| 0.8221655785795299|14.726901853122856|
|1998| 0.8201308902930613|14.521619679380873|
|1999| 0.7912135131090428|14.103188925565087|
|2000| 0.7057158014704847|12.536615192105337|
|2001| 0.6680985612693261|11.821621459971889|
|2002| 0.6227662921090243|10.889929438782124|
|2003| 0.6001401910201596|10.353958326582196|
|2004| 0.5464600578648561| 9.351399630822973|
|2005| 0.5118752707715917| 8.665479086019664|
|2006|0.48120536668885155| 8.130937671358069|
|2007| 0.4308632387277576| 7.139530777380276|
|2008| 0.3939572504598595|6.468511

In [188]:
no2.groupBy(year('date_local')).avg('aqi', 'arithmetic_mean').select(col("year(date_local)").alias("Year"), col("avg(arithmetic_mean)").alias("No2"), col("avg(aqi)").alias("AQI")).sort('Year').show(28)

+----+------------------+------------------+
|Year|               No2|               AQI|
+----+------------------+------------------+
|1990| 18.52854638343699| 32.52072401285294|
|1991|18.165256745162626|31.846556626673138|
|1992|17.468983301706015|30.498500070633792|
|1993| 17.24469456443412|30.218031146162776|
|1994| 17.72709782603055|30.934833743601764|
|1995|16.988859075288566|29.945123863280024|
|1996| 16.39557998331853|29.102841667847898|
|1997|15.733826309318225| 28.24878306725002|
|1998|15.925665648994197|  28.4278327082623|
|1999| 16.39755906789483| 29.50165519675313|
|2000|15.332295421908501| 27.49152776149459|
|2001|14.932344301206294|26.929423323599245|
|2002|14.264765278534401|25.932616477716568|
|2003| 13.68065607855724|25.004868025484733|
|2004| 12.72996048592729|23.438541666666666|
|2005|12.781965128653962|23.752543985439303|
|2006|11.906634627097018| 22.77588371141456|
|2007|11.489549252984554|21.901891286673987|
|2008|10.883111762023308|21.095122657211085|
|2009| 9.8

In [189]:
o3.groupBy(year('date_local')).avg('aqi', 'arithmetic_mean').select(col("year(date_local)").alias("Year"), col("avg(arithmetic_mean)").alias("O3"), col("avg(aqi)").alias("AQI")).sort('Year').show(28)

+----+--------------------+------------------+
|Year|                  O3|               AQI|
+----+--------------------+------------------+
|1990| 0.02917773120616503| 45.05635765627139|
|1991| 0.02989380171799576|46.370947663364355|
|1992|0.028390782745594402|42.611522545361105|
|1993|0.029188639536002058| 44.17079964194033|
|1994|0.030151296359536835|45.871962610002775|
|1995| 0.03088213020502843| 47.64446683110104|
|1996|  0.0307704619573374| 46.15407922992543|
|1997| 0.03107154760433678| 46.13850354318614|
|1998| 0.03271064695459362|50.668427416788646|
|1999| 0.03308111044242739|  50.9121987186513|
|2000|0.031090646807187053|46.085598109292874|
|2001| 0.03176177463570149| 47.21249484446567|
|2002|0.032321832760752296| 48.67900222046043|
|2003| 0.03156020554612209|45.773150694760005|
|2004|0.030649765259600153| 42.48108872708864|
|2005|0.032092882382268206| 46.26232580490149|
|2006| 0.03256116669385342|45.734867650329946|
|2007| 0.03296977575061558| 46.21611753480832|
|2008|0.03244

In [190]:
temp.groupBy(year('date_local')).avg('aqi', 'arithmetic_mean').select(col("year(date_local)").alias("Year"), col("avg(arithmetic_mean)").alias("Temp"), col("avg(aqi)").alias("AQI")).sort('Year').show(28)

+----+------------------+----+
|Year|              Temp| AQI|
+----+------------------+----+
|1990|53.744737512403354|null|
|1991|54.387546912695974|null|
|1992|53.397466403140676|null|
|1993| 53.34523828503471|null|
|1994|54.547137106299466|null|
|1995| 55.64843882630052|null|
|1996| 54.87537081299792|null|
|1997| 55.65061910091302|null|
|1998|57.381626671952866|null|
|1999| 57.27393747238396|null|
|2000|56.730839424663934|null|
|2001|57.211862974796304|null|
|2002|57.088377579037264|null|
|2003| 57.05777696447241|null|
|2004| 56.86720974468335|null|
|2005| 57.60807172813541|null|
|2006| 57.96782123855886|null|
|2007| 57.18167062015973|null|
|2008| 55.83212670772041|null|
|2009| 55.56795376874028|null|
|2010|55.945239018177126|null|
|2011|  55.7138652164867|null|
|2012| 57.49611659432808|null|
|2013|  55.0340702014359|null|
|2014| 55.38748289461968|null|
|2015| 56.48320160613883|null|
|2016| 57.48171305492431|null|
|2017| 57.14234382450479|null|
+----+------------------+----+



## Export all the dataframe to csv files so we can work on these in Tableau with the state code and county code


In [133]:
co_tab = co.groupBy('state_name', 'county_name', 'state_code', 'county_code', 'date').avg('aqi', 'arithmetic_mean').select('state_name', 'county_name', 'state_code', 'county_code', col("avg(aqi)").alias("Avg_AQI"), col("avg(arithmetic_mean)").alias("Avg_Value"), 'date')

In [16]:
co_tab.toPandas().to_csv('co_tab.csv')

In [17]:
no2_tab = no2.groupBy('state_name', 'county_name', 'state_code', 'county_code', 'date').avg('aqi', 'arithmetic_mean').select('state_name', 'county_name', 'state_code', 'county_code', col("avg(aqi)").alias("Avg_AQI"), col("avg(arithmetic_mean)").alias("Avg_Value"), 'date')

In [18]:
no2_tab.toPandas().to_csv('no2_tab.csv')

In [19]:
o3_tab = o3.groupBy('state_name', 'county_name', 'state_code', 'county_code', 'date').avg('aqi', 'arithmetic_mean').select('state_name', 'county_name', 'state_code', 'county_code', col("avg(aqi)").alias("Avg_AQI"), col("avg(arithmetic_mean)").alias("Avg_Value"), 'date')

In [20]:
o3_tab.toPandas().to_csv('o3_tab.csv')

In [21]:
temp_tab = temp.groupBy('state_name', 'county_name', 'state_code', 'county_code', 'date').avg('aqi', 'arithmetic_mean').select('state_name', 'county_name', 'state_code', 'county_code', col("avg(aqi)").alias("Avg_AQI"), col("avg(arithmetic_mean)").alias("Avg_Value"), 'date')

In [22]:
temp_tab.toPandas().to_csv('temp_tab.csv')