# Introduction to Spark

## Basic initialization

`SparkSession` is used to connect to the Spark Cluster.

In [29]:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.functions import col, udf, trim, isnull,date_format
from pyspark.sql.types import FloatType, IntegerType, StringType


def to_float (s) :
   return float(s.replace(',','.'))
float_udf = udf(to_float , FloatType())

def to_int (s) :
   return int(s)
int_udf = udf(to_int , IntegerType())


SyntaxError: invalid syntax (<ipython-input-29-2df4f215b7fa>, line 4)

We will use Pandas to operate on the reduced data in the *driver program*.

In [2]:
import pandas as pd

Numpy will be always useful.

In [3]:
import numpy as np

Create a new session (or reuse an existing one).

In [4]:
spark = SparkSession.builder.getOrCreate()

In [5]:
print(spark)

<pyspark.sql.session.SparkSession object at 0x7fcea8d5ec18>


We can see that the session is established.

## Creating Spark Data Frames from input files

In [6]:
file_path = "s3a://openaq-fetches/realtime/2019-02-02/1549113064.ndjson"

# Read in the airports data
smog = spark.read.json(file_path)
smog.printSchema()


root
 |-- attribution: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- url: string (nullable = true)
 |-- averagingPeriod: struct (nullable = true)
 |    |-- unit: string (nullable = true)
 |    |-- value: double (nullable = true)
 |-- city: string (nullable = true)
 |-- coordinates: struct (nullable = true)
 |    |-- latitude: double (nullable = true)
 |    |-- longitude: double (nullable = true)
 |-- country: string (nullable = true)
 |-- date: struct (nullable = true)
 |    |-- local: string (nullable = true)
 |    |-- utc: string (nullable = true)
 |-- location: string (nullable = true)
 |-- mobile: boolean (nullable = true)
 |-- parameter: string (nullable = true)
 |-- sourceName: string (nullable = true)
 |-- sourceType: string (nullable = true)
 |-- unit: string (nullable = true)
 |-- value: double (nullable = true)



In [7]:
smog.limit(10).toPandas()

Unnamed: 0,attribution,averagingPeriod,city,coordinates,country,date,location,mobile,parameter,sourceName,sourceType,unit,value
0,"[(StateAir.net, http://www.stateair.net/web/po...","(hours, 1.0)",Chengdu,"(30.63, 104.07)",CN,"(2019-02-02T21:00:00+08:00, 2019-02-02T13:00:0...",Chengdu,False,pm25,Chengdu,government,µg/m³,56.0
1,"[(StateAir.net, http://www.stateair.net/web/po...","(hours, 1.0)",Shanghai,"(31.21, 121.44)",CN,"(2019-02-02T21:00:00+08:00, 2019-02-02T13:00:0...",Shanghai,False,pm25,Shanghai,government,µg/m³,44.0
2,"[(StateAir.net, http://www.stateair.net/web/po...","(hours, 1.0)",Shenyang,"(41.78, 123.42)",CN,"(2019-02-02T21:00:00+08:00, 2019-02-02T13:00:0...",Shenyang,False,pm25,Shenyang,government,µg/m³,64.0
3,"[(StateAir.net, http://www.stateair.net/web/po...","(hours, 1.0)",Beijing,"(39.95, 116.47)",CN,"(2019-02-02T21:00:00+08:00, 2019-02-02T13:00:0...",Beijing US Embassy,False,pm25,Beijing US Embassy,government,µg/m³,156.0
4,"[(StateAir.net, http://www.stateair.net/web/po...","(hours, 1.0)",Guangzhou,"(23.12, 113.32)",CN,"(2019-02-02T21:00:00+08:00, 2019-02-02T13:00:0...",Guangzhou,False,pm25,Guangzhou,government,µg/m³,45.0
5,"[(Pollution Control Department, http://www.aqm...","(hours, 1.0)",Bangkok,"(13.732878086814, 100.48601837802)",TH,"(2019-02-02T19:00:00+07:00, 2019-02-02T12:00:0...","Hiran Ruchi, Thon Buri",False,co,Thailand,government,ppm,0.58
6,"[(Pollution Control Department, http://www.aqm...","(hours, 1.0)",Bangkok,"(13.732878086814, 100.48601837802)",TH,"(2019-02-02T19:00:00+07:00, 2019-02-02T12:00:0...","Hiran Ruchi, Thon Buri",False,o3,Thailand,government,ppm,0.026
7,"[(Kosovo AQ, http://kosovo-airquality.com/secu...","(hours, 1.0)",Drenas,"(42.625568, 20.89621)",XK,"(2019-02-02T14:00:00+01:00, 2019-02-02T13:00:0...",Drenas,False,no2,Kosovo,government,µg/m³,47.667
8,"[(Luftkvalitet.info, http://www.luftkvalitet.i...","(hours, 1.0)",Oslo,"(59.93233, 10.72447)",NO,"(2019-02-02T14:00:00+01:00, 2019-02-02T13:00:0...",Kirkeveien,False,co,Norway,government,µg/m³,256.268
9,"[(Pollution Control Department, http://www.aqm...","(hours, 1.0)",Bangkok,"(13.732878086814, 100.48601837802)",TH,"(2019-02-02T19:00:00+07:00, 2019-02-02T12:00:0...","Hiran Ruchi, Thon Buri",False,so2,Thailand,government,ppm,0.0


In [8]:
# "Spłaszczenie" schematu ramki
smog_df=smog.select("location", "city", "parameter", "unit", "value",\
                col("date.local").alias("date_local"),\
                col("date.utc").alias("date_utc"),\
                "coordinates.*", "averagingPeriod", "sourceName", "sourceType", "mobile")
smog_df.limit(10).toPandas()

Unnamed: 0,location,city,parameter,unit,value,date_local,date_utc,latitude,longitude,averagingPeriod,sourceName,sourceType,mobile
0,Chengdu,Chengdu,pm25,µg/m³,56.0,2019-02-02T21:00:00+08:00,2019-02-02T13:00:00.000Z,30.63,104.07,"(hours, 1.0)",Chengdu,government,False
1,Shanghai,Shanghai,pm25,µg/m³,44.0,2019-02-02T21:00:00+08:00,2019-02-02T13:00:00.000Z,31.21,121.44,"(hours, 1.0)",Shanghai,government,False
2,Shenyang,Shenyang,pm25,µg/m³,64.0,2019-02-02T21:00:00+08:00,2019-02-02T13:00:00.000Z,41.78,123.42,"(hours, 1.0)",Shenyang,government,False
3,Beijing US Embassy,Beijing,pm25,µg/m³,156.0,2019-02-02T21:00:00+08:00,2019-02-02T13:00:00.000Z,39.95,116.47,"(hours, 1.0)",Beijing US Embassy,government,False
4,Guangzhou,Guangzhou,pm25,µg/m³,45.0,2019-02-02T21:00:00+08:00,2019-02-02T13:00:00.000Z,23.12,113.32,"(hours, 1.0)",Guangzhou,government,False
5,"Hiran Ruchi, Thon Buri",Bangkok,co,ppm,0.58,2019-02-02T19:00:00+07:00,2019-02-02T12:00:00.000Z,13.732878,100.486018,"(hours, 1.0)",Thailand,government,False
6,"Hiran Ruchi, Thon Buri",Bangkok,o3,ppm,0.026,2019-02-02T19:00:00+07:00,2019-02-02T12:00:00.000Z,13.732878,100.486018,"(hours, 1.0)",Thailand,government,False
7,Drenas,Drenas,no2,µg/m³,47.667,2019-02-02T14:00:00+01:00,2019-02-02T13:00:00.000Z,42.625568,20.89621,"(hours, 1.0)",Kosovo,government,False
8,Kirkeveien,Oslo,co,µg/m³,256.268,2019-02-02T14:00:00+01:00,2019-02-02T13:00:00.000Z,59.93233,10.72447,"(hours, 1.0)",Norway,government,False
9,"Hiran Ruchi, Thon Buri",Bangkok,so2,ppm,0.0,2019-02-02T19:00:00+07:00,2019-02-02T12:00:00.000Z,13.732878,100.486018,"(hours, 1.0)",Thailand,government,False


Zapytanie 1: znaleźć miasto, w którym najniższe zanieczyszczenie w wybranym miesiącu (roku) jest największe spośród wszystkich miast

In [17]:
smogDaily = spark.read.csv("s3a://openaq-fetches/daily/2017-09-07.csv").toDF('location', 'value', 'unit', 'parameter', 'country', 'city', 'region', 'date_local', 'date_utc','sourceType', 'mobile', 'latitude', 'longitude', 'avgPeriod', 'avgPeriodUnits')
smogDaily = smogDaily.withColumn('valueF',float_udf(col('value')))

smogDaily.groupBy("location").min("valueF").orderBy("min(valueF)", ascending=False).limit(1).toPandas()

Unnamed: 0,location,min(valueF)
0,Кожуховский проезд,230.0


In [None]:
Zapytanie 2: ranking miast pod względem liczby dni w miesiącu (roku), w których poziom jest wyższy niż X

In [34]:
smogDaily.select(col("location"),col("value"), date_format(col("date_local"),"MM-dd").alias("MM"),date_format(col("date_local"),"dd").alias("dd")).show()

+---------------+-----+-----+---+
|       location|value|   MM| dd|
+---------------+-----+-----+---+
|        Wynyard|    4|09-06| 06|
|        Wynyard|    8|09-06| 06|
|      Emu River|    1|09-06| 06|
|      Emu River|    5|09-06| 06|
|West Ulverstone|    2|09-06| 06|
|West Ulverstone|    6|09-06| 06|
|      Devonport|    2|09-06| 06|
|      Devonport|    7|09-06| 06|
|      Sheffield|   58|09-06| 06|
|      Sheffield|   64|09-06| 06|
|      Deloraine|    0|09-06| 06|
|      Deloraine|    4|09-06| 06|
|       Westbury|   14|09-06| 06|
|       Westbury|   18|09-06| 06|
|        Hadspen|    2|09-06| 06|
|        Hadspen|    4|09-06| 06|
|       Longford|    1|09-06| 06|
|       Longford|    3|09-06| 06|
|          Perth|    1|09-06| 06|
|          Perth|    5|09-06| 06|
+---------------+-----+-----+---+
only showing top 20 rows

