In [3]:
import findspark
findspark.init()
import os
import pandas as pd
import requests
import json
import datetime
import pyspark
from pyspark.sql import *
import pyspark.sql.functions as F
from pyspark.sql.types import *
from pyspark import SparkContext, SparkConf
import pprint
from config import Config

In [4]:
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages=com.amazonaws:aws-java-sdk:1.11.1034,org.apache.hadoop:hadoop-aws:3.2.0 pyspark-shell'
config = Config()

In [5]:
conf = SparkConf().set('spark.executor.extraJavaOptions','-Dcom.amazonaws.services.s3.enableV4=true'). \
 set('spark.driver.extraJavaOptions','-Dcom.amazonaws.services.s3.enableV4=true'). \
 setAppName('weather_etl_aws').setMaster('local[*]')

In [6]:
sc = SparkContext(conf=conf)

In [7]:
sc.setSystemProperty('com.amazonaws.services.s3.enableV4', 'true')
spark = SparkSession(sc)
yesterday = datetime.datetime.now() - datetime.timedelta(days=1)
timestamp = datetime.datetime.timestamp(yesterday)
pp = pprint.PrettyPrinter(compact=True)

In [8]:
api_key = config.config.get('api', 'key')
access_key = config.config.get('aws', 'access_key')
secret_key = config.config.get('aws', 'secret_key')

## Hadoop AWS Configuration

In [9]:
sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", access_key)
sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", secret_key)
sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "s3.amazonaws.com")
sc._jsc.hadoopConfiguration().set("fs.s3a.impl", 'org.apache.hadoop.fs.s3a.S3AFileSystem')

## Functions

In [28]:
def get_data(state, lat, lon):
    weather_api_url = "https://api.openweathermap.org/data/2.5/onecall/timemachine?lat={}&lon={}&dt={}&units=imperial&appid={}".format(
        lat, lon, int(timestamp), api_key)
    response = requests.get(weather_api_url)
    response.raise_for_status()
    return response.text

## Load State Data

In [7]:
state_data = spark.read.csv('US_States.csv', inferSchema=True, header=True)
state_data.show()

+-------------+---------+-----------+
|        State|      Lat|        Lon|
+-------------+---------+-----------+
|    Wisconsin|     44.5|      -89.5|
|West Virginia|     39.0|      -80.5|
|      Vermont|     44.0| -72.699997|
|        Texas|     31.0|     -100.0|
| South Dakota|     44.5|     -100.0|
| Rhode Island|41.700001|      -71.5|
|       Oregon|     44.0|     -120.5|
|     New York|     43.0|      -75.0|
|New Hampshire|     44.0|      -71.5|
|     Nebraska|     41.5|     -100.0|
|       Kansas|     38.5|      -98.0|
|  Mississippi|     33.0|      -90.0|
|     Illinois|     40.0|      -89.0|
|     Delaware|     39.0|      -75.5|
|  Connecticut|41.599998| -72.699997|
|     Arkansas|34.799999| -92.199997|
|      Indiana|40.273502| -86.126976|
|     Missouri|38.573936|  -92.60376|
|      Florida|27.994402| -81.760254|
|       Nevada|39.876019|-117.224121|
+-------------+---------+-----------+
only showing top 20 rows



## Get Data From API

In [29]:
print("--------------------------------")
print('Getting data from API')
print("--------------------------------")
state_df = state_data \
    .select('State', 'Lat', 'Lon') \
    .rdd.map(lambda row: (row[0], get_data(row[0], row[1], row[2]))) \
    .toDF(['State', 'Data'])
print("--------------------------------")
print('Finished fetching data from API')
print("--------------------------------")

## Save JSON to file to limit API calls

In [30]:
print("--------------------------------")
print('Saving data to file')
print("--------------------------------")
state_df.write.mode('overwrite').json('raw_weather_data.json')
print("--------------------------------")
print('Finished saving data to json file')
print("--------------------------------")

In [13]:
df = spark.read.json('raw_weather_data.json')
df.printSchema()
df.show()

root
 |-- Data: string (nullable = true)
 |-- State: string (nullable = true)

+--------------------+-------------+
|                Data|        State|
+--------------------+-------------+
|{"lat":44.5,"lon"...|    Wisconsin|
|{"lat":39,"lon":-...|West Virginia|
|{"lat":44,"lon":-...|      Vermont|
|{"lat":31,"lon":-...|        Texas|
|{"lat":44.5,"lon"...| South Dakota|
|{"lat":41.7,"lon"...| Rhode Island|
|{"lat":44,"lon":-...|       Oregon|
|{"lat":43,"lon":-...|     New York|
|{"lat":44,"lon":-...|New Hampshire|
|{"lat":41.5,"lon"...|     Nebraska|
|{"lat":38.5,"lon"...|       Kansas|
|{"lat":33,"lon":-...|  Mississippi|
|{"lat":40,"lon":-...|     Illinois|
|{"lat":39,"lon":-...|     Delaware|
|{"lat":41.6,"lon"...|  Connecticut|
|{"lat":34.8,"lon"...|     Arkansas|
|{"lat":40.2735,"l...|      Indiana|
|{"lat":38.5739,"l...|     Missouri|
|{"lat":27.9944,"l...|      Florida|
|{"lat":39.876,"lo...|       Nevada|
+--------------------+-------------+
only showing top 20 rows



## Extract data into columns

In [15]:
api_json_example = df.select('Data').first()[0]

api_schema = df.select(F.schema_of_json(api_json_example)).first()[0]

df2 = df.withColumn('Data', F.from_json(F.col('Data'), schema=api_schema)).select('*', 'Data.*').drop('Data')
df2.printSchema()
df2.show()

root
 |-- State: string (nullable = true)
 |-- current: struct (nullable = true)
 |    |-- clouds: long (nullable = true)
 |    |-- dew_point: double (nullable = true)
 |    |-- dt: long (nullable = true)
 |    |-- feels_like: double (nullable = true)
 |    |-- humidity: long (nullable = true)
 |    |-- pressure: long (nullable = true)
 |    |-- sunrise: long (nullable = true)
 |    |-- sunset: long (nullable = true)
 |    |-- temp: double (nullable = true)
 |    |-- uvi: double (nullable = true)
 |    |-- visibility: long (nullable = true)
 |    |-- weather: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- description: string (nullable = true)
 |    |    |    |-- icon: string (nullable = true)
 |    |    |    |-- id: long (nullable = true)
 |    |    |    |-- main: string (nullable = true)
 |    |-- wind_deg: long (nullable = true)
 |    |-- wind_speed: double (nullable = true)
 |-- hourly: array (nullable = true)
 |    |-- element: stru

## Flattening Hourly Data

In [16]:
flat_df = df2 \
    .withColumn('hourly', F.explode('hourly')) \
    .select('State', 'timezone', 'hourly.*') \
    .withColumnRenamed('timezone', 'Timezone') \
    .groupBy('State', 'Timezone') \
    .agg(F.max('temp').alias('Max_Temp'), \
         F.min('temp').alias('Min_Temp'), \
         F.max('weather.main')[0].alias('Conditions'), \
         F.max('weather.Description')[0].alias('Details'))
flat_df.printSchema()
flat_df.show()

root
 |-- State: string (nullable = true)
 |-- Timezone: string (nullable = true)
 |-- Max_Temp: double (nullable = true)
 |-- Min_Temp: double (nullable = true)
 |-- Conditions: string (nullable = true)
 |-- Details: string (nullable = true)

+--------------+-------------------+--------+--------+----------+----------------+
|         State|           Timezone|Max_Temp|Min_Temp|Conditions|         Details|
+--------------+-------------------+--------+--------+----------+----------------+
|    Washington|America/Los_Angeles|   75.29|   45.52|      Mist| overcast clouds|
|     Wisconsin|    America/Chicago|   79.21|   48.09|    Clouds|scattered clouds|
|   Connecticut|   America/New_York|   73.81|   57.83|    Clouds|scattered clouds|
|     Louisiana|    America/Chicago|   93.07|   73.63|      Haze|            haze|
|      Nebraska|    America/Chicago|  103.48|    68.0|      Rain|      light rain|
|South Carolina|   America/New_York|   86.14|   66.47|    Clouds|scattered clouds|
| West Vi

## Add yesterday Date to dataframe

In [17]:
flat_df = flat_df.withColumn('Date', F.lit(yesterday.date()))
flat_df.printSchema()
flat_df.show()

root
 |-- State: string (nullable = true)
 |-- Timezone: string (nullable = true)
 |-- Max_Temp: double (nullable = true)
 |-- Min_Temp: double (nullable = true)
 |-- Conditions: string (nullable = true)
 |-- Details: string (nullable = true)
 |-- Date: date (nullable = false)

+--------------+-------------------+--------+--------+----------+----------------+----------+
|         State|           Timezone|Max_Temp|Min_Temp|Conditions|         Details|      Date|
+--------------+-------------------+--------+--------+----------+----------------+----------+
|    Washington|America/Los_Angeles|   75.29|   45.52|      Mist| overcast clouds|2021-06-17|
|     Wisconsin|    America/Chicago|   79.21|   48.09|    Clouds|scattered clouds|2021-06-17|
|   Connecticut|   America/New_York|   73.81|   57.83|    Clouds|scattered clouds|2021-06-17|
|     Louisiana|    America/Chicago|   93.07|   73.63|      Haze|            haze|2021-06-17|
|      Nebraska|    America/Chicago|  103.48|    68.0|      Rai

## Final Dataframe

In [18]:
final_df = flat_df.select('Date', 'State', 'Timezone', 'Conditions', 'Details', 'Min_Temp', 'Max_Temp') \
    .orderBy(F.col('State').asc())
final_df.printSchema()
final_df.show(truncate=False)

root
 |-- Date: date (nullable = false)
 |-- State: string (nullable = true)
 |-- Timezone: string (nullable = true)
 |-- Conditions: string (nullable = true)
 |-- Details: string (nullable = true)
 |-- Min_Temp: double (nullable = true)
 |-- Max_Temp: double (nullable = true)

+----------+-----------+----------------------------+----------+----------------+--------+--------+
|Date      |State      |Timezone                    |Conditions|Details         |Min_Temp|Max_Temp|
+----------+-----------+----------------------------+----------+----------------+--------+--------+
|2021-06-17|Alabama    |America/Chicago             |Clouds    |overcast clouds |64.63   |88.86   |
|2021-06-17|Alaska     |America/Anchorage           |Clouds    |few clouds      |50.0    |71.6    |
|2021-06-17|Arizona    |America/Phoenix             |Clouds    |scattered clouds|67.48   |103.44  |
|2021-06-17|Arkansas   |America/Chicago             |Clouds    |scattered clouds|70.12   |89.89   |
|2021-06-17|Californi

### Save to parquet format locally

In [19]:
print("----------------------------------------------------")
print('Saving dataframe to parquet format')
print("----------------------------------------------------")
final_df.write.parquet('weather_df_output.parquet', mode="overwrite")
print("----------------------------------------------------")
print('Finished saving dataframe to parquet format')
print("----------------------------------------------------")

----------------------------------------------------
Saving dataframe to parquet format
----------------------------------------------------
----------------------------------------------------
Finished saving dataframe to parquet format
----------------------------------------------------


## Spark Dataframe in parquet to S3 bucket

### Init Spark to S3 connection

In [23]:
print("----------------------------------------------------")
print('Saving dataframe to parquet format to S3')
print("----------------------------------------------------")
final_df.write.parquet('s3a://jn-de-sample-bucket-2021/weather-etl/weather_df_output.parquet', mode='overwrite')
print("----------------------------------------------------")
print('Finished saving dataframe to parquet format to S3')
print("----------------------------------------------------")

--------------------------------
Saving dataframe to parquet format to S3
--------------------------------
--------------------------------
Finished saving dataframe to parquet format to S3
--------------------------------


### Read from parquet into dataframe

In [20]:
print("----------------------------------------------------")
print('Loading parquet data from S3')
print("----------------------------------------------------")
parq_df = spark.read.parquet("s3a://jn-de-sample-bucket-2021/weather-etl/weather_df_output.parquet")
parq_df.show()
print("----------------------------------------------------")
print('Finished loading parquet data from S3')
print("----------------------------------------------------")

----------------------------------------------------
Loading parquet data from S3
----------------------------------------------------
+----------+--------------+--------------------+----------+----------------+--------+--------+
|      Date|         State|            Timezone|Conditions|         Details|Min_Temp|Max_Temp|
+----------+--------------+--------------------+----------+----------------+--------+--------+
|2021-06-17|       Indiana|America/Indiana/I...|      Rain|scattered clouds|   54.79|   80.53|
|2021-06-17|North Carolina|    America/New_York|    Clouds|scattered clouds|   59.54|   83.43|
|2021-06-17|South Carolina|    America/New_York|    Clouds|scattered clouds|   66.47|   86.14|
|2021-06-17| Massachusetts|    America/New_York|    Clouds|scattered clouds|   57.22|   73.13|
|2021-06-17|   Connecticut|    America/New_York|    Clouds|scattered clouds|   57.83|   73.81|
|2021-06-17|  Pennsylvania|    America/New_York|    Clouds| overcast clouds|   50.16|   72.61|
|2021-06-1

## Using SQL queries on spark dataframe from S3

In [23]:
parq_df.createOrReplaceTempView("ParquetTable")
parq_sql = spark.sql("SELECT * FROM ParquetTable WHERE Conditions LIKE 'Rain'")
parq_sql.show()

+----------+-------------+--------------------+----------+----------------+--------+--------+
|      Date|        State|            Timezone|Conditions|         Details|Min_Temp|Max_Temp|
+----------+-------------+--------------------+----------+----------------+--------+--------+
|2021-06-17|      Indiana|America/Indiana/I...|      Rain|scattered clouds|   54.79|   80.53|
|2021-06-17|New Hampshire|    America/New_York|      Rain|scattered clouds|   51.94|   67.17|
|2021-06-17| Rhode Island|    America/New_York|      Rain|scattered clouds|   59.32|    75.2|
|2021-06-17|West Virginia|    America/New_York|      Rain| overcast clouds|   47.32|   73.02|
|2021-06-17|     Maryland|    America/New_York|      Rain|scattered clouds|   62.06|   79.05|
|2021-06-17|      Florida|    America/New_York|      Rain|scattered clouds|   71.78|    83.7|
|2021-06-17|     Nebraska|     America/Chicago|      Rain|      light rain|    68.0|  103.48|
+----------+-------------+--------------------+----------+--