# Spark 

Distinct monitor count per state in ascending order implemented in pure Python.

This notebook exemplifies the execution of a spark program in Python, using Hadoop.
In this example, hadoop runs in standalone mode and reads data from the local filesystem, while in cluster mode data is read typically from HDFS distributed file system.

This dataset contains several entries for the same monitor, therefore firstly a cleaning of the data needed to be done.
The key used was the FIPS number, the FIPS number is a unique number to designate each U.S. state, and also for the outlying areas and the freely associated states, such as Guam or Puerto Rico.
This list is easily found at the american census governamental website:
https://www.census.gov/library/reference/code-lists/ansi/ansi-codes-for-states.html


### Download the dataset 
This dataset was uploaded to dropbox for conveniance, might not be the fastest way to import the data, but since we are currently using a very reduced extract of the main dataset should't take too long (around 20 sec.)

In [3]:
!wget -O air_quality.csv https://www.dropbox.com/s/4jxfdsgn2tdo7zo/epa_hap_daily_summary-small.csv?dl=0

--2021-12-23 11:19:41--  https://www.dropbox.com/s/4jxfdsgn2tdo7zo/epa_hap_daily_summary-small.csv?dl=0
Resolving www.dropbox.com (www.dropbox.com)... 162.125.68.18, 2620:100:6024:18::a27d:4412
Connecting to www.dropbox.com (www.dropbox.com)|162.125.68.18|:443... connected.
HTTP request sent, awaiting response... 301 Moved Permanently
Location: /s/raw/4jxfdsgn2tdo7zo/epa_hap_daily_summary-small.csv [following]
--2021-12-23 11:19:55--  https://www.dropbox.com/s/raw/4jxfdsgn2tdo7zo/epa_hap_daily_summary-small.csv
Reusing existing connection to www.dropbox.com:443.
HTTP request sent, awaiting response... 302 Found
Location: https://uc427a81e0cf3b7d038c874275e7.dl.dropboxusercontent.com/cd/0/inline/BcY1kA5-Donk0IpfyUr-Cp-XqyIv8SomGxtH9nHBSd79VrP37H9uPCLz6npR5URq0qN7nxCMYJJ-PGtZt60JFSCDb9HcE9pEZJCdPFsCXBdHNBmjl54pQdEXzClCwsX3xmpD_ncDj1mf8wcl9k8Ydrfo/file# [following]
--2021-12-23 11:19:56--  https://uc427a81e0cf3b7d038c874275e7.dl.dropboxusercontent.com/cd/0/inline/BcY1kA5-Donk0IpfyUr-Cp-Xq


The processing is split into two steps:

+ The mapper emits its work to the reducer and the reducer emits the final outtput
+ We however did all the calculations in a single code cell and since the results were not going to be a very big ammount of data we decided to print the final result for better code execution and to quickly see the final result


By starting an element with "%%file", you are specifying that when run, the contents are written to the local disk.

In [1]:
%%file mapper.py
#!/usr/bin/env python

# import sys
import sys
# import string library function  
import string  

Overwriting mapper.py


In [2]:
import pyspark
from operator import add as sum
total_count = 0

#Start a sparkcontext locally
sc = pyspark.SparkContext('local[*]')

try:
    #first we have to read our dataset file that was imported above
    rdd=sc.textFile(('./air_quality.csv'))
    #we don't want the first line that contains the header
    header=rdd.first()
    data=rdd.filter(lambda line: line != header)
    #a bit of data cleaning to avoid errors that can be avoided
    clean_data=data.filter(lambda line: len(line)>0)
    #we are dealing with a comma separated value file 
    split_cleaned_data=clean_data.map(lambda line: line.split(','))
    #let's make sure we are only using the values we need for this analysis 
    working_data=split_cleaned_data.map(lambda line: (line[23], line[24]))
    unique_data=working_data.distinct()
    
    #after arranging our data another map is needed to finish this analysis
    state_data=unique_data.map(lambda line: line[-1])
    #counting our values (the FIPS code for all the states and islands) we can also use the name column for better readability
    state_count=state_data.map(lambda state: (state, 1))
    state_count.count()
    
    distinct_state_data=state_count.reduceByKey(lambda a,b: a+b)
    distinct_state_data.count()
    #sorting our data to be displayed in ascending order 
    sorted_state_data=distinct_state_data.map(lambda a: (a[1], a[0])).sortByKey()
    sorted_state_data.top(20)
    sorted_final=sorted_state_data.map(lambda a: (a[1], a[0]))
    sorted_final.top(20)
    
    for k, v in sorted_final.collect():print(k, v)

except Exception as err:
    print(err)
sc.stop()    


Virgin Islands 4
District Of Columbia 5
Hawaii 5
Nebraska 6
Delaware 6
Puerto Rico 6
North Dakota 7
South Dakota 7
Nevada 8
Wyoming 9
West Virginia 10
Arkansas 10
New Hampshire 12
Utah 12
Rhode Island 12
Alaska 12
Missouri 15
Connecticut 15
Country Of Mexico 16
Maryland 17
New Mexico 17
Idaho 17
Iowa 18
Massachusetts 19
Virginia 20
Oklahoma 20
Mississippi 21
Maine 21
Vermont 21
New Jersey 23
Wisconsin 24
Tennessee 28
Alabama 29
Kentucky 30
Oregon 31
Georgia 34
Kansas 37
Arizona 38
Louisiana 40
Washington 42
Illinois 49
North Carolina 49
Colorado 50
Indiana 52
Florida 57
Montana 60
Pennsylvania 60
South Carolina 64
New York 66
Michigan 83
Ohio 89
Minnesota 94
Texas 132
California 161


In [3]:
sc.stop()

#### Q2 - Which counties have the best/worst air quality? (Rank counties considering pollutants’ level!)
#### Please run the first line of the notebook to import the csv file and the lines regarding the question you wish to see


In [1]:
# I need a dataset that includes my counties column aswell as my arithmetic mean values to answer this question

In [82]:
import pyspark
from operator import add as sum
import operator
total_count = 0

#Start a sparkcontext locally
sc = pyspark.SparkContext('local[*]')

try:
    #first we have to read our dataset file that was imported above
    rdd=sc.textFile(('./air_quality.csv'))
    #we don't want the first line that contains the header
    header=rdd.first()
    data=rdd.filter(lambda line: line != header)
    #a bit of data cleaning to avoid errors that can be avoided
    clean_data=data.filter(lambda line: len(line)>0)
    
    #we are dealing with a comma separated value file, I am spliting and taking the 
    #arithmetic mean column as a float to avoid errors while doing some math to get the average
    split_cleaned_data=clean_data.map(lambda line: (line.split(',')[25], float(line.split(',')[16])))
     
    rdd1=split_cleaned_data
    
    #now we have the values for all measurements for all counties in the format we want
    #!!! talk about this even tho its not used: countsByKey = sc.broadcast(rdd1.countByKey()) # SAMPLE OUTPUT of countsByKey.value: {u'2013-09-09': 215, u'2013-09-08': 69, ... snip ...}
    countsByKey = rdd1.countByKey()
    rdd1 = rdd1.reduceByKey(operator.add) # Calculate the numerators (i.e. the SUMs).
    
    rdd1 = rdd1.map(lambda x: (x[0], x[1] / countsByKey[x[0]])) # Divide each SUM by it's denominator (i.e. COUNT)
    
    #finally we need to sort our data from best to worst, lowest value to highest value, sort by column 1 mean of arithm values
    final_sorted = rdd1.sortBy(lambda line: line[1])
    
    #show our data since we know it is not very big
    for k, v in final_sorted.collect():print(k, v)  

    #print(rdd1.collect())
    #final = working_data.mapValues(lambda v: (v, 1)).reduceByKey(lambda a,b: (a[0]+b[0], a[1]+b[1]))
    #avg_by_key = final.mapValues(lambda v: (v, 1)).reduceByKey(lambda a,b: (a[0]+b[0], a[1]+b[1])).mapValues(lambda v: v[0]/v[1]).collectAsMap()
    
    #working_data.groupByKey().mapValues(lambda x: sum(x) / len(x)).collect()
    
    #final_data = working_data.groupBy("c1").mean("c2").show(truncate=False)
    


except Exception as err:
    print(err)
sc.stop()    

    

Sweet Grass 0.0
Martin 0.0
Wrangell Petersburg 4.5359477124183e-05
Northwest Arctic 6.333333333333333e-05
Aleutians East 0.00010957142857142859
Eagle 0.00011627906976744187
Matanuska-Susitna 0.00012758855585831067
Kenai Peninsula 0.00014010416666666666
Yukon-Koyukuk 0.00015384615384615385
Lewis 0.00015738396624472573
Rio Blanco 0.00015972222222222223
Maui 0.00017128700128700135
Hawaii 0.00017388581952117868
Josephine 0.00017785106382978727
Denali 0.0001861187845303868
Okanogan 0.0001986374133949192
Siskiyou 0.00021453333333333336
Del Norte 0.00021479999999999996
Powell 0.0002212528473804101
Sublette 0.00023102535832414555
Sanders 0.00023128099173553722
Clackamas 0.0002386721991701245
Taos 0.00024844748858447496
Rosebud 0.0002500240384615386
Rio Arriba 0.00025866972477064226
Trinity 0.0002622997416020672
White Pine 0.00026566423357664244
Pitkin 0.00028215503875968993
Teton 0.00028398868458274395
Mono 0.0002847272727272728
Clallam 0.00029043897216274106
Kittitas 0.00029143556280587275
Sh

#### Q3 - Which states have the best/worst air quality in each year? (Rank states per year considering pollutants’ level!)
#### Please run the first line of the notebook to import the csv file and the lines regarding the question you wish to see

In [1]:
# I need a dataset that includes my states column aswell as the arithmetic mean and the corresponding dates

In [51]:
import pyspark
from operator import add as sum
import operator
total_count = 0

#Start a sparkcontext locally
sc = pyspark.SparkContext('local[*]')

try:
    #first we have to read our dataset file that was imported above
    rdd=sc.textFile(('./air_quality.csv'))
    #we don't want the first line that contains the header, spark data in a rdd are not structured columns don't have names
    header=rdd.first()
    data=rdd.filter(lambda line: line != header)
    #a bit of data cleaning to avoid errors that can be avoided
    clean_data=data.filter(lambda line: len(line)>0)
    
    #we are dealing with a comma separated value file, I am spliting and taking the 
    #arithmetic mean column as a float to avoid errors while doing some math to get the average
    #I want to get the date , state name , values
    #For the date i just want to get the year, and I know date is YYYY-MM-DD format
    split_cleaned_data=clean_data.map(lambda line: ( line.split(',')[11].split('-')[0], line.split(',')[24], float(line.split(',')[16])))
    #it will be usefull to join first two values for calculating averages and sorting later on  
    rdd1=split_cleaned_data.map(lambda line: (line[0]+line[1], line[2]))

    #now we have the values for all measurements for all states and the years in the format we want
    #!!! talk about this even tho its not used: countsByKey = sc.broadcast(rdd1.countByKey()) 
    
    #countsByKey = rdd1.countByKey()
    rdd1 = rdd1.reduceByKey(operator.add) # Calculate the numerators (i.e. the SUMs).
    #for k, v in rdd1.collect():print(k, v)  

    
    rdd1 = rdd1.map(lambda x: (x[0], x[1] / countsByKey[x[0]])) # Divide each SUM by it's denominator (i.e. COUNT)

    #finally we need to sort our data from best to worst, lowest value to highest value, sort by column 1 mean of arithm values
    final_sorted = rdd1.sortBy(lambda line: (line[0][0:4], line[1]))
    
    #show our data since we know it is not very big
    for k, v in final_sorted.collect():print(k, v)  

except Exception as err:
    print(err)
sc.stop()    


1990West Virginia 0.0
1990Virgin Islands 0.0
1990Oklahoma 0.0
1990Wisconsin 0.0
1990Hawaii 0.00019703703703703704
1990Nevada 0.00042080000000000004
1990Alaska 0.00044208333333333334
1990South Dakota 0.0005705
1990Washington 0.0005974999999999999
1990Wyoming 0.0006045454545454545
1990Utah 0.0007970588235294118
1990New Mexico 0.0008222222222222222
1990Oregon 0.0008596296296296297
1990Arizona 0.0008620134228187919
1990Maine 0.0009789285714285713
1990Colorado 0.0021623741007194244
1990Mississippi 0.0026666666666666666
1990Missouri 0.0056
1990Michigan 0.006559896373056996
1990Connecticut 0.0081
1990Georgia 0.008366666666666666
1990Puerto Rico 0.01005
1990North Carolina 0.0143
1990Alabama 0.024325
1990Iowa 0.0332
1990Pennsylvania 0.1059625
1990Ohio 0.135716
1990Illinois 0.14575701219512197
1990New Jersey 0.2933352941176471
1990Minnesota 0.306488
1990California 0.41153099836333895
1990South Carolina 0.5598140350877193
1990District Of Columbia 0.8261508196721312
1990Virginia 0.8451416666666666

In [33]:
sc.stop()