# BDA2 - Spark SQL

Redo the exercises BDA1 using Spark SQL whenever possible. There are two ways to write queries in SparkSQL - using built-in API functions or running SQL-like
queries. To pass this lab, you need to use built-in API functions for all the questions, in addition, SQL-like queries for the second question. The slides of this link (https://www.ida.liu.se/~732A54/lab/SparkSQLQuickIntro.pdf) show some examples of Spark SQL.

## Setup

The following code is used to prepare the data files, execute only once and **be careful!**

In [1]:
import pyspark
import pyspark.sql
from pyspark.sql import SQLContext, Row
from pyspark.sql import functions as F
from pyspark.sql import Window
import csv
import math
import sys
import pandas as pd

tr_path = "../data/temperature-readings.csv"
tr_big_path = "../data/temperature-readings-big.csv"
pr_path = "../data/precipitation-readings.csv"
sro_path = "../data/stations-Ostergotland.csv"
sr_path = "../data/stations.csv"

In [2]:
sc = pyspark.SparkContext(appName = "Temperature")
sc_sql = pyspark.SQLContext(sc)

temperature_readings = sc.textFile(tr_path)
temperature_readings_big = sc.textFile(tr_big_path)
precipitation_readings = sc.textFile(pr_path)
station_oster_readings = sc.textFile(sro_path)
station_readings = sc.textFile(sr_path)

## Assignment 1

What are the lowest and highest temperatures measured each year for the period 1950- 2014. Provide the lists sorted in the descending order with respect to the maximum temperature. In this exercise you will use the `temperature-readings.csv` file.

In [3]:
%%time

res = temperature_readings.map(lambda l: l.split(";"))
res = res.map(lambda p: Row(year=int(p[1].split("-")[0]), temp=float(p[3])))
res = sc_sql.createDataFrame(res)
res = res.filter((res['year'] >= 1950) & (res['year'] <= 2014)).groupBy('year')
res_max = res.agg(F.max('temp').alias('temp_max'))
res_min = res.agg(F.min('temp').alias('temp_min'))

global df

df = res_max.join(res_min, "year").orderBy('year', ascending=False).toPandas()

CPU times: user 80.9 ms, sys: 37.3 ms, total: 118 ms
Wall time: 5min 17s


In [4]:
df.head(10)

Unnamed: 0,year,temp_max,temp_min
0,2014,34.4,-42.5
1,2013,31.6,-40.7
2,2012,31.3,-42.7
3,2011,32.5,-42.0
4,2010,34.4,-41.7
5,2009,31.5,-38.5
6,2008,32.2,-39.3
7,2007,32.2,-40.7
8,2006,32.7,-40.6
9,2005,32.1,-39.4


**Task 1a:** What are the lowest and highest temperature measured each year for the period 1950 to 2014? Provide the list sorted in the descending order with respect to the maximum temperature. Extend the program to include the station number (not the station name) where the maximum/minimum temperature was measured.

In [None]:
%%time

res = temperature_readings.map(lambda l: l.split(";"))
res = res.map(lambda p: Row(station=int(p[0]), year=int(p[1].split("-")[0]), temp=float(p[3])))
res = sc_sql.createDataFrame(res)
res = res.filter((res['year'] >= 1950) & (res['year'] <= 2014))

w = Window.partitionBy('year')

res_max = res.withColumn('temp_max', F.max('temp').over(w)).where(F.col('temp') == F.col('temp_max')).drop('temp')
res_min = res.withColumn('temp_min', F.min('temp').over(w)).where(F.col('temp') == F.col('temp_min')).drop('temp')

global df

df = res_max.join(res_min, "year").orderBy('year', ascending=False).toPandas()

In [None]:
df.head(10)

## Assignment 2

Count the number of readings for each month in the period of 1950-2014 which are higher than 10 degrees. Repeat the exercise, this time taking only distinct readings from each station. That is, if a station reported a reading above 10 degrees in some month, then it appears only once in the count for that month. In this exercise you will use the `temperature-readings.csv` file.

The output should contain the following information:

Year, month, count

In [5]:
%%time

res = temperature_readings.map(lambda l: l.split(";"))
res = res.map(lambda p: Row(station=int(p[0]), date=p[1], year=int(p[1].split("-")[0]), month=int(p[1].split("-")[1]), yearMonth=p[1][0:7], time=p[2], temp=float(p[3])))
res = sc_sql.createDataFrame(res)
res = res.filter((res['year'] >= 1950) & (res['year'] <= 2014))
res = res.filter(res['temp'] > 10.0)

res = res.groupBy('yearMonth').count().select('yearMonth', F.col('count'))
res = res.select(res['yearMonth'][0:4].cast("Int").alias('year'), res['yearMonth'][6:8].cast("Int").alias('month'), res['count']).orderBy(['year', 'month'], ascending=False)

global df
df = res.toPandas()

CPU times: user 108 ms, sys: 13.9 ms, total: 122 ms
Wall time: 3min 35s


In [6]:
df.head(10)

Unnamed: 0,year,month,count
0,2014,12,3
1,2014,11,8139
2,2014,10,42191
3,2014,9,86090
4,2014,8,124045
5,2014,7,147681
6,2014,6,101711
7,2014,5,57250
8,2014,4,19862
9,2014,3,4213


The following configuration is used for both SQL statement.

In [7]:
db_temp = temperature_readings.map(lambda l: l.split(";")) \
            .filter(lambda m: (int(m[1][:4]) >= 1950 and int(m[1][:4]) <= 2014)) \
            .map(lambda m: Row(station = m[0], date = m[1], year = m[1].split("-")[0], month = m[1].split("-")[1],
                               time = m[2], temp = float(m[3]), quality = m[4]))

schema = sc_sql.createDataFrame(db_temp)

schema.registerTempTable("temp_schema")

In [8]:
%%time

global res

res = sc_sql.sql("""
    SELECT year, month, COUNT(year) AS count
    FROM temp_schema
    WHERE year >= 1950 AND year <= 2014 and temp > 10
    GROUP BY year, month
    ORDER BY cast(year as integer) DESC, cast(month as integer) DESC
""")

res = res.toPandas()

CPU times: user 73.9 ms, sys: 19.4 ms, total: 93.3 ms
Wall time: 3min 55s


In [9]:
res.head(10)

Unnamed: 0,year,month,count
0,2014,12,3
1,2014,11,8139
2,2014,10,42191
3,2014,9,86090
4,2014,8,124045
5,2014,7,147681
6,2014,6,101711
7,2014,5,57250
8,2014,4,19862
9,2014,3,4213


Now we repeat the exercise where we take only distinct readings from each station.

In [10]:
%%time

res = temperature_readings.map(lambda l: l.split(";"))
res = res.map(lambda p: Row(station=int(p[0]), date=p[1], year=int(p[1].split("-")[0]), month=int(p[1].split("-")[1]), yearMonth=p[1][0:7], time=p[2], temp=float(p[3])))
res = sc_sql.createDataFrame(res)
res = res.filter((res['year'] >= 1950) & (res['year'] <= 2014))
res = res.filter(res['temp'] > 10.0)

res = res.groupBy('yearMonth').count().select('yearMonth', F.col('count'))
res = res.groupBy('yearMonth').count().select('yearMonth', F.col('count'))
res = res.select(res['yearMonth'][0:4].cast("Int").alias('year'), res['yearMonth'][6:8].cast("Int").alias('month'), res['count']).orderBy(['year', 'month'], ascending=False)

global df
df = res.toPandas()

CPU times: user 85.1 ms, sys: 31.1 ms, total: 116 ms
Wall time: 3min 39s


In [11]:
df.head(10)

Unnamed: 0,year,month,count
0,2014,12,1
1,2014,11,1
2,2014,10,1
3,2014,9,1
4,2014,8,1
5,2014,7,1
6,2014,6,1
7,2014,5,1
8,2014,4,1
9,2014,3,1


In [12]:
%%time

global res

res = sc_sql.sql("""
    SELECT year, month, COUNT(DISTINCT year) AS count
    FROM temp_schema
    WHERE year >= 1950 AND year <= 2014 and temp > 10
    GROUP BY year, month
    ORDER BY cast(year as integer) DESC, cast(month as integer) DESC
""")

res = res.toPandas()

CPU times: user 68.6 ms, sys: 22.4 ms, total: 91 ms
Wall time: 3min 43s


In [13]:
res.head(10)

Unnamed: 0,year,month,count
0,2014,12,1
1,2014,11,1
2,2014,10,1
3,2014,9,1
4,2014,8,1
5,2014,7,1
6,2014,6,1
7,2014,5,1
8,2014,4,1
9,2014,3,1


## Assignment 3

Find the average monthly temperature for each available station in Sweden. Your result should include average temperature for each station for each month in the period of 1960-2014. Bear in mind that not every station has the readings for each month in this timeframe. In this exercise you will use the `temperature-readings.csvfile`.

The output should contain the following information:

Year, month, station number, average monthly temperature

In [14]:
%%time

res = temperature_readings.map(lambda l: l.split(";"))
res = res.map(lambda p: Row(station=int(p[0]), date=p[1], year=int(p[1].split("-")[0]), month=int(p[1].split("-")[1]), yearMonthStation=p[1][0:7] + '-' + p[0], temp=float(p[3])))
res = sc_sql.createDataFrame(res)
res = res.filter((res['year'] >= 1960) & (res['year'] <= 2014)).groupBy('yearMonthStation')

res = res.agg(F.avg('temp'))
res = res.select(res['yearMonthStation'][0:4].cast('Int').alias('year'),
                 res['yearMonthStation'][6:2].cast('Int').alias('month'),
                 res['yearMonthStation'][9:20].cast('Int').alias('station'),
                 F.round(res['avg(temp)'], 2).alias('temp_avg'))

global df
df = res.orderBy(['year', 'month', 'station'], ascending=False).toPandas()

CPU times: user 899 ms, sys: 37.5 ms, total: 936 ms
Wall time: 3min 33s


In [15]:
df.head(10)

Unnamed: 0,year,month,station,temp_avg
0,2014,12,192840,-11.48
1,2014,12,191910,-10.86
2,2014,12,191720,-11.33
3,2014,12,189720,-9.84
4,2014,12,188850,-7.64
5,2014,12,188820,-7.58
6,2014,12,188800,-6.94
7,2014,12,183750,-8.9
8,2014,12,182930,-9.77
9,2014,12,182910,-9.38


## Assignment 4

Provide  a  list  of  stations  with  their  associated  maximum  measured  temperatures  and maximum  measured  daily  precipitation.  Show  only  those  stations  where  the  maximum temperature is between 25 and 30 degrees and maximum daily precipitation is between 100 mm and 200mm. In this exercise you will use the `temperature-readings.csv` and `precipitation-readings.csv` files. 

The output should contain the following information:

Station number, maximum measured temperature, maximum daily precipitation

In [16]:
%%time

res_t = temperature_readings.map(lambda l: l.split(";"))
res_p = precipitation_readings.map(lambda l: l.split(";"))

res_p = res_p.map(lambda p: Row(station=int(p[0]), precipitation=float(p[3])))
res_p = sc_sql.createDataFrame(res_p)
res_p = res_p.filter((res_p['precipitation'] >= 100.0) & (res_p['precipitation'] <= 200.0))
res_p = res_p.groupBy('station').agg(F.max('precipitation').alias('precip_max'))

res_t = res_t.map(lambda p: Row(station=int(p[0]), date=p[1], year=int(p[1].split("-")[0]), month=int(p[1].split("-")[1]), temp=float(p[3])))
res_t = sc_sql.createDataFrame(res_t)
res_t = res_t.filter((res_t['temp'] >= 25.0) & (res_t['temp'] <= 30.0))
res_t = res_t.groupBy('station').agg(F.max('temp').alias('temp_max'))

global df

df = res_t.join(res_p, "station").orderBy('station', ascending=False).toPandas()

CPU times: user 92.4 ms, sys: 10.4 ms, total: 103 ms
Wall time: 3min 55s


In [17]:
df.head()

Unnamed: 0,station,temp_max,precip_max


## Assignment 5

Calculate the average monthly precipitation for the Östergotland region (list of stations is provided in the separate file) for the period 1993-2016. In order to do this, you will first need to calculate  the  total  monthly  precipitation  for  each  station  before calculating the monthly average (by averaging over stations). In this exercise you will use the `precipitation-readings.csv` and `stations-Ostergotland.csv` files.

HINT (not for the SparkSQL lab): Avoid using joins here! `stations-Ostergotland.csv` is small  and if distributed will cause a number of unnecessary shuffles when joined with precipitation RDD. If you distribute `precipitation-readings.csv` then either repartition your stations RDD to 1 partition or make use of the collect to acquire a python list and broadcast function to broadcast the list to all nodes.

The output should contain the following information:

Year, month, average monthly precipitation

In [18]:
%%time

res_p = precipitation_readings.map(lambda l: l.split(";"))
res_s = station_oster_readings.map(lambda l: l.split(";"))

res_s = res_s.map(lambda m: int(m[0])).collect()
res_s = sc.broadcast(value=res_s)

res_p = res_p.map(lambda p: Row(station = int(p[0]), dateStation = p[1] + '-' + p[0], year = int(p[1][0:4]), month = int(p[1][8:10]), precipitation = float(p[3])))
res_p = sc_sql.createDataFrame(res_p)
res_p = res_p.filter((res_p['year'] >= 1993) & (res_p['year'] <= 2016))

res = res_p.where(res_p['station'].isin(res_s.value))

res = res.groupBy('dateStation').agg(F.sum('precipitation').alias('precipitation_cum'))
res = res.select(res['dateStation'][0:7].alias('yearMonth'), res['precipitation_cum'])
res = res.groupBy('yearMonth').agg(F.avg('precipitation_cum').alias('precipitation_avg'))
res = res.select(res['yearMonth'][0:4].alias('year'),
                 res['yearMonth'][6:8].alias('month'),
                 res['precipitation_avg'])

global df

df = res.orderBy(['year', 'month'], ascending=False).toPandas()

CPU times: user 85.9 ms, sys: 60.6 ms, total: 147 ms
Wall time: 1min 4s


In [19]:
df.head(10)

Unnamed: 0,year,month,precipitation_avg
0,2016,7,0.0
1,2016,6,1.58875
2,2016,5,0.943548
3,2016,4,0.896667
4,2016,3,0.643952
5,2016,2,0.743534
6,2016,1,0.723077
7,2015,12,0.936842
8,2015,11,2.129583
9,2015,10,0.072984


## Assignment 6

Compare the average monthly temperature (find the difference) in the period 1950-2014 for all stations in Östergotland with long-term monthly averages in the period of 1950-1980. Make a plot of your results. 

HINT: The first step is to find the monthly averages for each station. Then, you can average over all stations to acquire the average temperature for a specific year and month. This RDD/DataFrame can beu sed to compute the long-term average by averaging over all the years in the interval.

The out put should contain the following information:

Year, month, difference

**Answer:** It seems like this exercise has been removed from last year to this year, as the instructions have chnaged from 1. april to 2. april 2019.

In [20]:
sc.stop()