## Requirements
1. Apache Spark binary (https://spark.apache.org/)
2. For Windows: winutils (https://medium.com/@dvainrub/how-to-install-apache-spark-2-x-in-your-pc-e2047246ffc3)
3. Setting ```JAVA_HOME```, ```SPARK_HOME```, and ```HADOOP_HOME```
4. Python 3.x (from Anaconda distribution)
5. ```findspark``` https://pypi.org/project/findspark/
6. Jupyter Notebook (available from Anaconda installation)

### References
https://spark.apache.org/docs/2.3.3/sql-programming-guide.html

## Spark Initialization

In [1]:
# Import findspark to read SPARK_HOME and HADOOP_HOME
import findspark
findspark.init()

In [2]:
# Import required library
from pyspark.sql import SparkSession

# Create Spark Session
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .getOrCreate()

In [3]:
# Print Spark object ID
print(spark)

<pyspark.sql.session.SparkSession object at 0x10b22bba8>


## Loading Data using Spark

In [4]:
df = spark.read.csv("/Users/gunstringer/Downloads/epa_hap_daily_summary.csv", header=True, inferSchema=True)

In [5]:
df.show()

+----------+-----------+--------+--------------+---+---------+-----------+-------+--------------------+---------------+------------------+-------------------+--------------------+----------+-----------------+-------------------+---------------+---------------+--------------+----+-----------+--------------------+--------------------+--------------------+------------+----------------+-------------+--------------------+-------------------+
|state_code|county_code|site_num|parameter_code|poc| latitude|  longitude|  datum|      parameter_name|sample_duration|pollutant_standard|         date_local|    units_of_measure|event_type|observation_count|observation_percent|arithmetic_mean|first_max_value|first_max_hour| aqi|method_code|         method_name|     local_site_name|             address|  state_name|     county_name|    city_name|           cbsa_name|date_of_last_change|
+----------+-----------+--------+--------------+---+---------+-----------+-------+--------------------+---------------

In [126]:
df.printSchema()

root
 |-- state_code: integer (nullable = true)
 |-- county_code: integer (nullable = true)
 |-- site_num: integer (nullable = true)
 |-- parameter_code: integer (nullable = true)
 |-- poc: integer (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- datum: string (nullable = true)
 |-- parameter_name: string (nullable = true)
 |-- sample_duration: string (nullable = true)
 |-- pollutant_standard: string (nullable = true)
 |-- date_local: timestamp (nullable = true)
 |-- units_of_measure: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- observation_count: integer (nullable = true)
 |-- observation_percent: double (nullable = true)
 |-- arithmetic_mean: double (nullable = true)
 |-- first_max_value: double (nullable = true)
 |-- first_max_hour: integer (nullable = true)
 |-- aqi: string (nullable = true)
 |-- method_code: integer (nullable = true)
 |-- method_name: string (nullable = true)
 |-- local_site_name: st

In [8]:
# Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("pollution")

In [29]:
row_count = spark.sql ("SELECT COUNT(1)FROM pollution")

In [30]:
row_count.show()

+--------+
|count(1)|
+--------+
| 8097069|
+--------+



In [9]:
result = spark.sql("SELECT DISTINCT parameter_name FROM pollution")

In [10]:
result.show()

+--------------------+
|      parameter_name|
+--------------------+
|             Benzene|
|Chromium VI (TSP) LC|
|  12-Dichloropropane|
|Acrolein - Unveri...|
| Tetrachloroethylene|
|       Lead PM10 STP|
|     Nickel PM10 STP|
|    Nickel (TSP) STP|
|   Chromium PM10 STP|
|  Ethylene dibromide|
|    Mercury PM10 STP|
|  Chromium (TSP) STP|
|cis-13-Dichloropr...|
|      Vinyl chloride|
|   Trichloroethylene|
|       Lead PM2.5 LC|
|          Chloroform|
|  Manganese PM10 STP|
|        Acetaldehyde|
|  Beryllium PM2.5 LC|
+--------------------+
only showing top 20 rows



## Data Mining Process

In [65]:
# Mencari rata rata konsentrasi air polutant terbesar dalam suatu state (ex: pennsylvania)

query1 = spark.sql ("SELECT parameter_name, AVG(arithmetic_mean),units_of_measure FROM pollution\
                        WHERE state_name = 'Pennsylvania' \
                        GROUP BY parameter_name, units_of_measure \
                        ORDER BY AVG(arithmetic_mean) DESC")

In [66]:
query1.show()

+--------------------+--------------------+--------------------+
|      parameter_name|avg(arithmetic_mean)|    units_of_measure|
+--------------------+--------------------+--------------------+
|        Formaldehyde|  2.6133003936669663|Parts per billion...|
|             Benzene|  2.2339411705663816|Parts per billion...|
|        Acetaldehyde|  2.0236999796918775|Parts per billion...|
|   Trichloroethylene| 0.24696496444731733|Parts per billion...|
|     Dichloromethane|  0.2274077303487766|Parts per billion...|
|Acrolein - Unveri...| 0.22246105104761915|Parts per billion...|
| Tetrachloroethylene| 0.20841215255332896|Parts per billion...|
|        13-Butadiene| 0.20047134502923972|Parts per billion...|
|Carbon tetrachloride| 0.09884115055713964|Parts per billion...|
|          Chloroform| 0.03560626050963646|Parts per billion...|
| Beryllium (TSP) STP| 0.02544813278008298|Nanograms/cubic m...|
| Manganese (TSP) STP|0.025297648530331437|Micrograms/cubic ...|
| Ethylene dichloride|0.0

In [75]:
# state yang memiliki rata rata suatu air polutant (ex: Benzene) paling banyak

query2 = spark.sql ("SELECT state_name, AVG(arithmetic_mean), units_of_measure FROM pollution\
                        WHERE parameter_name = 'Benzene'\
                        GROUP BY state_name, units_of_measure\
                        ORDER BY AVG(arithmetic_mean) DESC")

In [76]:
query2.show()

+-----------------+--------------------+--------------------+
|       state_name|avg(arithmetic_mean)|    units_of_measure|
+-----------------+--------------------+--------------------+
|Country Of Mexico|   8.708797468354431|Parts per billion...|
|          Alabama|   6.430967802146523|Parts per billion...|
|           Kansas|   5.406358490566038|Parts per billion...|
|           Alaska|   4.649691056910569|Parts per billion...|
|       New Mexico|  4.2419974639498434|Parts per billion...|
|         Missouri|  4.0470664072806475|Parts per billion...|
|         Colorado|   3.530716491661107|Parts per billion...|
|          Arizona|  3.2813660599781884|Parts per billion...|
|            Texas|   3.165615802749503|Parts per billion...|
|         Arkansas|  3.1010606060606065|Parts per billion...|
|             Utah|   3.088827945776851|Parts per billion...|
|      Puerto Rico|   3.016304182509505|Parts per billion...|
|       California|  2.9796335305933703|Parts per billion...|
|       

In [119]:
# nilai rata-rata konsentrasi zat polutan dari tahun ke tahun pada US

query3 = spark.sql ("SELECT EXTRACT (YEAR FROM date_local), parameter_name, AVG(arithmetic_mean), units_of_measure FROM pollution\
                    GROUP BY EXTRACT(YEAR FROM date_local), units_of_measure, parameter_name \
                    ORDER BY EXTRACT(YEAR FROM date_local) DESC")

In [120]:
query3.show()

+------------------------------+--------------------+--------------------+--------------------+
|year(CAST(date_local AS DATE))|      parameter_name|avg(arithmetic_mean)|    units_of_measure|
+------------------------------+--------------------+--------------------+--------------------+
|                          2017|   Trichloroethylene|0.003557620817843...|Parts per billion...|
|                          2017|  Chromium (TSP) STP|0.006750087719298247|Micrograms/cubic ...|
|                          2017|        13-Butadiene| 0.10028373702422147|Parts per billion...|
|                          2017|      Vinyl chloride|                 0.0|Parts per billion...|
|                          2017|     Dichloromethane|  0.6513717472118958|Parts per billion...|
|                          2017|             Benzene|  1.6149861979010498|Parts per billion...|
|                          2017|1122-Tetrachloroe...|            3.125E-4|Parts per billion...|
|                          2017|trans-13

In [121]:
# nilai konsentrasi benzene dari tahun ke tahun pada US

query4 = spark.sql ("SELECT EXTRACT (YEAR FROM date_local), AVG(arithmetic_mean), units_of_measure FROM pollution\
                    WHERE parameter_name = 'Benzene' \
                    GROUP BY EXTRACT(YEAR FROM date_local), units_of_measure \
                    ORDER BY EXTRACT(YEAR FROM date_local) DESC")

In [122]:
query4.show()

+------------------------------+--------------------+--------------------+
|year(CAST(date_local AS DATE))|avg(arithmetic_mean)|    units_of_measure|
+------------------------------+--------------------+--------------------+
|                          2017|  1.6149861979010498|Parts per billion...|
|                          2016|  1.2660918363206974|Parts per billion...|
|                          2015|  1.3288374331274566|Parts per billion...|
|                          2014|   1.324096420975431|Parts per billion...|
|                          2013|   1.451811187132259|Parts per billion...|
|                          2012|   1.603766487430922|Parts per billion...|
|                          2011|  1.5579733075711695|Parts per billion...|
|                          2010|  1.7227266005456587|Parts per billion...|
|                          2009|   1.894231711105646|Parts per billion...|
|                          2008|  2.3998395963532064|Parts per billion...|
|                        

In [130]:
import plotly.plotly as py
import plotly.graph_objs as go
import pandas as pd

data = [go.Histogram(x=query4.toPandas()['d1'])]
py.iplot(data, filename="spark/less_2_hour_rides")

Py4JJavaError: An error occurred while calling o1753.collectToPython.
: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
Exchange rangepartitioning(year(CAST(date_local AS DATE))#930 DESC NULLS LAST, 200)
+- *(2) HashAggregate(keys=[year(cast(date_local#21 as date))#954, units_of_measure#22], functions=[avg(arithmetic_mean#26)], output=[year(CAST(date_local AS DATE))#930, avg(arithmetic_mean)#929, units_of_measure#22])
   +- Exchange hashpartitioning(year(cast(date_local#21 as date))#954, units_of_measure#22, 200)
      +- *(1) HashAggregate(keys=[year(cast(date_local#21 as date)) AS year(cast(date_local#21 as date))#954, units_of_measure#22], functions=[partial_avg(arithmetic_mean#26)], output=[year(cast(date_local#21 as date))#954, units_of_measure#22, sum#947, count#948L])
         +- *(1) Project [date_local#21, units_of_measure#22, arithmetic_mean#26]
            +- *(1) Filter (isnotnull(parameter_name#18) && (parameter_name#18 = Benzene))
               +- *(1) FileScan csv [parameter_name#18,date_local#21,units_of_measure#22,arithmetic_mean#26] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/gunstringer/Downloads/epa_hap_daily_summary.csv], PartitionFilters: [], PushedFilters: [IsNotNull(parameter_name), EqualTo(parameter_name,Benzene)], ReadSchema: struct<parameter_name:string,date_local:timestamp,units_of_measure:string,arithmetic_mean:double>

	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:374)
	at org.apache.spark.sql.execution.SortExec.inputRDDs(SortExec.scala:121)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:610)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:296)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3258)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3255)
	at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3365)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3364)
	at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3255)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.IllegalArgumentException: Unsupported class file major version 55
	at org.apache.xbean.asm6.ClassReader.<init>(ClassReader.java:166)
	at org.apache.xbean.asm6.ClassReader.<init>(ClassReader.java:148)
	at org.apache.xbean.asm6.ClassReader.<init>(ClassReader.java:136)
	at org.apache.xbean.asm6.ClassReader.<init>(ClassReader.java:237)
	at org.apache.spark.util.ClosureCleaner$.getClassReader(ClosureCleaner.scala:49)
	at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:517)
	at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:500)
	at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
	at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:134)
	at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:134)
	at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
	at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
	at scala.collection.mutable.HashMap$$anon$1.foreach(HashMap.scala:134)
	at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
	at org.apache.spark.util.FieldAccessFinder$$anon$3.visitMethodInsn(ClosureCleaner.scala:500)
	at org.apache.xbean.asm6.ClassReader.readCode(ClassReader.java:2175)
	at org.apache.xbean.asm6.ClassReader.readMethod(ClassReader.java:1238)
	at org.apache.xbean.asm6.ClassReader.accept(ClassReader.java:631)
	at org.apache.xbean.asm6.ClassReader.accept(ClassReader.java:355)
	at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:307)
	at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:306)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:306)
	at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
	at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2100)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
	at org.apache.spark.RangePartitioner$.sketch(Partitioner.scala:309)
	at org.apache.spark.RangePartitioner.<init>(Partitioner.scala:171)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.prepareShuffleDependency(ShuffleExchangeExec.scala:224)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:91)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:128)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119)
	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
	... 37 more
