# Extract, Transform and Load using Apache Spark SQL

The notebook demonstrates how to use Apache Spark to ETL weather observation data stored in IBM Cloud Object Storage. The observation data generated on an hourly basis will be aggregated to show minimum, average and maximum night and day temperatures per day. The aggregarted information will finally be stored in a PostgreSQL database.

## Extracting Weather Observation Data 

This section reads an IBM Cloud Object Storage JSON file that contains weather observation data from the Weather Data Company. The information will bve stored in a Spark data frame.

Waiting for a Spark session to start...
Spark Initialization Done! ApplicationId = app-20190520072501-0000
KERNEL_ID = 5a3878a5-6ceb-47d7-8c83-bbcb269a0ae6


In [2]:
df = spark.read.json(path_1)

In [3]:
df.printSchema()

root
 |-- _corrupt_record: string (nullable = true)
 |-- blunt_phrase: string (nullable = true)
 |-- class: string (nullable = true)
 |-- clds: string (nullable = true)
 |-- day_ind: string (nullable = true)
 |-- dewPt: long (nullable = true)
 |-- expire_time_gmt: long (nullable = true)
 |-- feels_like: long (nullable = true)
 |-- gust: long (nullable = true)
 |-- heat_index: long (nullable = true)
 |-- icon_extd: long (nullable = true)
 |-- key: string (nullable = true)
 |-- max_temp: string (nullable = true)
 |-- min_temp: string (nullable = true)
 |-- obs_id: string (nullable = true)
 |-- obs_name: string (nullable = true)
 |-- precip_hrly: string (nullable = true)
 |-- precip_total: string (nullable = true)
 |-- pressure: double (nullable = true)
 |-- pressure_desc: string (nullable = true)
 |-- pressure_tend: string (nullable = true)
 |-- primary_swell_direction: string (nullable = true)
 |-- primary_swell_height: string (nullable = true)
 |-- primary_swell_period: string (nullabl

## Transforming Weather Observation Data

In this section the Spark data frame will be transformed by aggregating temperatures on a daily basis to show the minimum, average and maximum day and night temperature for for the locations. For this purpose a Spark SQL query will be used. A new data frame will then be created that represents the aggregated information in a relational format.

In [4]:
df.createOrReplaceTempView("observations")

In [5]:
query = "WITH wd AS (SELECT day_ind, from_unixtime(valid_time_gmt,'YYYY-MM-dd') as obs_date, obs_name, temp FROM observations WHERE class = 'observation') \n " + \
        "SELECT wd.obs_name, wd.obs_date, wd.day_ind, ROUND(MIN(wd.temp),1) AS mintemp, ROUND(AVG(wd.temp),1) AS avgtemp, ROUND(MAX(wd.temp),1) AS maxtemp \n " + \
        "FROM wd \n " + \
        "GROUP BY wd.obs_name, wd.obs_date, wd.day_ind \n " + \
        "ORDER BY wd.obs_name, wd.obs_date, wd.day_ind"
         
print(query)

WITH wd AS (SELECT day_ind, from_unixtime(valid_time_gmt,'YYYY-MM-dd') as obs_date, obs_name, temp FROM observations WHERE class = 'observation') 
 SELECT wd.obs_name, wd.obs_date, wd.day_ind, ROUND(MIN(wd.temp),1) AS mintemp, ROUND(AVG(wd.temp),1) AS avgtemp, ROUND(MAX(wd.temp),1) AS maxtemp 
 FROM wd 
 GROUP BY wd.obs_name, wd.obs_date, wd.day_ind 
 ORDER BY wd.obs_name, wd.obs_date, wd.day_ind


In [6]:
sqlDF = spark.sql(query)
sqlDF.show()

+--------------------+----------+-------+-------+-------+-------+
|            obs_name|  obs_date|day_ind|mintemp|avgtemp|maxtemp|
+--------------------+----------+-------+-------+-------+-------+
|Hamburg/Finkenwerder|2019-04-28|      D|      8|    8.9|     13|
|Hamburg/Finkenwerder|2019-04-28|      N|      8|    9.0|     10|
|Hamburg/Finkenwerder|2019-04-29|      D|      8|   11.7|     17|
|Hamburg/Finkenwerder|2019-04-29|      N|      4|    8.0|     13|
|Hamburg/Finkenwerder|2019-04-30|      D|      4|   11.9|     17|
|Hamburg/Finkenwerder|2019-04-30|      N|      4|    8.0|     11|
|Hamburg/Finkenwerder|2019-05-01|      D|      9|    9.6|     11|
|Hamburg/Finkenwerder|2019-05-01|      N|      8|    8.6|      9|
|Hamburg/Finkenwerder|2019-05-02|      D|      8|    9.9|     13|
|Hamburg/Finkenwerder|2019-05-02|      N|      6|    7.4|      8|
|Hamburg/Finkenwerder|2019-05-03|      D|      4|    7.5|      9|
|Hamburg/Finkenwerder|2019-05-03|      N|      3|    4.1|      5|
|Hamburg/F

## Loading Weather Observation Data to a PostgreSQL Database

In this section the aggregated data will be stored in a PostgreSQL table using the SQLAlchemy API.

In [10]:
import sqlalchemy
import pandas as pd
sqlalchemy.__version__ 

'1.1.13'

In [8]:
postgreSQLEndpoint = 'postgres://<username>:<password>@9a0a10ef-e7b8-402f-a80f-7d5d4e716622.bc28ac43cf10402584b5f01db462d330.databases.appdomain.cloud:30476/ibmclouddb'#?sslmode=verify-full'

In [9]:
engine = sqlalchemy.create_engine(postgreSQLEndpoint)

  """)


In [11]:
pandas_df = sqlDF.toPandas()

In [12]:
pandas_df.to_sql(name = 'weather-observation-temperatures', con=engine, if_exists='replace', index=False, chunksize=1000)

In [13]:
df = pd.read_sql_table('weather-observation-temperatures', engine)
df.head(5)

Unnamed: 0,obs_name,obs_date,day_ind,mintemp,avgtemp,maxtemp
0,Hamburg/Finkenwerder,2019-04-28,D,8,8.9,13
1,Hamburg/Finkenwerder,2019-04-28,N,8,9.0,10
2,Hamburg/Finkenwerder,2019-04-29,D,8,11.7,17
3,Hamburg/Finkenwerder,2019-04-29,N,4,8.0,13
4,Hamburg/Finkenwerder,2019-04-30,D,4,11.9,17
