In [13]:
# Import findspark to read SPARK_HOME and HADOOP_HOME
import findspark
findspark.init()

In [14]:
# Import required library
from pyspark.sql import SparkSession

# Create Spark Session
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .getOrCreate()

In [15]:
# Print Spark object ID
print(spark)

<pyspark.sql.session.SparkSession object at 0x7f9a948eea90>


In [16]:
# Datasets can be downloaded from https://catalog.data.gov/dataset/crimes-2001-to-present-398a4

df3 = spark.read.csv("epa_co_daily_summary.csv", header=True, inferSchema=True)

In [17]:
df3.count()

8064820

In [18]:
df3.show()

+----------+-----------+--------+--------------+---+---------+-----------+-------+---------------+--------------------+------------------+-------------------+-----------------+----------+-----------------+-------------------+---------------+---------------+--------------+----+-----------+--------------------+--------------------+--------------------+--------------+--------------+----------------+--------------------+-------------------+
|state_code|county_code|site_num|parameter_code|poc| latitude|  longitude|  datum| parameter_name|     sample_duration|pollutant_standard|         date_local| units_of_measure|event_type|observation_count|observation_percent|arithmetic_mean|first_max_value|first_max_hour| aqi|method_code|         method_name|     local_site_name|             address|    state_name|   county_name|       city_name|           cbsa_name|date_of_last_change|
+----------+-----------+--------+--------------+---+---------+-----------+-------+---------------+--------------------

In [19]:
df3.schema

StructType(List(StructField(state_code,IntegerType,true),StructField(county_code,IntegerType,true),StructField(site_num,IntegerType,true),StructField(parameter_code,IntegerType,true),StructField(poc,IntegerType,true),StructField(latitude,DoubleType,true),StructField(longitude,DoubleType,true),StructField(datum,StringType,true),StructField(parameter_name,StringType,true),StructField(sample_duration,StringType,true),StructField(pollutant_standard,StringType,true),StructField(date_local,TimestampType,true),StructField(units_of_measure,StringType,true),StructField(event_type,StringType,true),StructField(observation_count,IntegerType,true),StructField(observation_percent,DoubleType,true),StructField(arithmetic_mean,DoubleType,true),StructField(first_max_value,DoubleType,true),StructField(first_max_hour,IntegerType,true),StructField(aqi,IntegerType,true),StructField(method_code,IntegerType,true),StructField(method_name,StringType,true),StructField(local_site_name,StringType,true),StructField

In [22]:
# Register the DataFrame as a SQL temporary view
df3.createOrReplaceTempView("carbon")

In [137]:
result = spark.sql("SELECT DISTINCT sample_duration FROM carbon")

In [138]:
result.show()

+--------------------+
|     sample_duration|
+--------------------+
|              1 HOUR|
|8-HR RUN AVG END ...|
+--------------------+



In [144]:
#jumlah banyaknya kejadian/update an carbon monoxide pada setiap kota, yang dihitung berdasarkan tanggal kejadian
#format tanggal yyyy/mm/dd

scene = spark.sql("SELECT city_name ,TO_DATE(`date_local`,'YYYY/MM/DD') AS date, COUNT(`date_local`) \
                    FROM carbon \
                    GROUP BY `date_local`,city_name \
                    ORDER BY `date_local` DESC")


In [145]:
scene.show()

+--------------------+----------+-----------------+
|           city_name|      date|count(date_local)|
+--------------------+----------+-----------------+
|        Cedar Rapids|2017-05-01|                1|
|          Des Moines|2017-05-01|                1|
|        Indianapolis|2017-05-01|                1|
|               Ogden|2017-05-01|                1|
|           New Paris|2017-05-01|                1|
|       Oklahoma City|2017-05-01|                2|
|              Dayton|2017-05-01|                1|
|            Portland|2017-05-01|                1|
|           Davenport|2017-05-01|                1|
|         Baton Rouge|2017-05-01|                1|
|  Dentsville (Dents)|2017-05-01|                1|
|           Charlotte|2017-05-01|                2|
|      Salt Lake City|2017-05-01|                1|
|         New Orleans|2017-05-01|                1|
|          Birmingham|2017-05-01|                2|
|               Provo|2017-05-01|                1|
|          W

In [153]:
#menampilkan kota yang memiliki good air quality 
#good air quality memiliki rentang antara 0 dan 50

quality = spark.sql("SELECT city_name, aqi\
            FROM carbon WHERE aqi between 0 and 50 \
            ORDER BY aqi DESC")

In [154]:
quality.show()

+--------------------+---+
|           city_name|aqi|
+--------------------+---+
|             Phoenix| 50|
|             Norfolk| 50|
|              Denver| 50|
|            San Juan| 50|
|            Portland| 50|
|              Nashua| 50|
|          Birmingham| 50|
|             Spokane| 50|
|         New Orleans| 50|
|            San Jose| 50|
|              Pomona| 50|
|              Denver| 50|
|               Chico| 50|
|        Redwood City| 50|
|               Salem| 50|
|Boise (corporate ...| 50|
|             Phoenix| 50|
|           Las Vegas| 50|
|             Spokane| 50|
|            Mexicali| 50|
+--------------------+---+
only showing top 20 rows



In [180]:
#kota yang terakhir meng update data carbon monoxide di kotanya


update = spark.sql("SELECT city_name ,date_of_last_change \
                FROM carbon \
                order by TO_DATE(`date_local`,'YYYY/MM/DD') DESC limit 1")

In [181]:
update.show()

+---------+-------------------+
|city_name|date_of_last_change|
+---------+-------------------+
|    Provo|2017-05-03 00:00:00|
+---------+-------------------+

