## Glue PySpark ETL Notebook for Weather Datalake Demo

<img src="https://thecodinginterface-images.s3.amazonaws.com/blogposts/weather-data-lake/S3+Data+Lake.jpeg">

#### Purpose:

- Convert string data types to appropriate numeric data types
- Explode out nested, multi-valued fields, to be top level columns
- Drop unneeded columns
- Convert to Parquet format for more efficient downstream analysis


### More or Less Obligatory / Boilerplate Imports

In [1]:
import numpy as np
import pandas as pd

from pyspark import SparkContext
from pyspark.sql import functions as F
from pyspark.sql.session import SparkSession
from pyspark.sql.types import FloatType, StringType, IntegerType, DateType

from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from awsglue.utils import getResolvedOptions

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
1,,pyspark,idle,,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

  return f(*args, **kwds)
  return f(*args, **kwds)

### Define some useful variables

In [2]:
glue_db = "weatherdata"
s3_bkt = "weather-data-collector-weatherdatalakes3bucket-1r8t1k304n84b"
rawweather_tbl = "rawweatherdata"
output_s3_path = "s3://{s3_bkt}/cleanweather".format(s3_bkt=s3_bkt)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Create Spark Session and Glue ETL Job Context Objects

In [3]:
spark = SparkSession(SparkContext.getOrCreate())
glue_ctx = GlueContext(SparkContext.getOrCreate())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Instantiate Glue Dynamic Data Frame Object from Glue Context and Glue Metadata Catalog

In [4]:
raw_dyf = glue_ctx.create_dynamic_frame.from_catalog(
                        database=glue_db,
                        table_name=rawweather_tbl
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Inspect the Schema of the Dataset

In [5]:
raw_dyf.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
|-- date: string
|-- astronomy: array
|    |-- element: struct
|    |    |-- sunrise: string
|    |    |-- sunset: string
|    |    |-- moonrise: string
|    |    |-- moonset: string
|    |    |-- moon_phase: string
|    |    |-- moon_illumination: string
|-- maxtempC: string
|-- maxtempF: string
|-- mintempC: string
|-- mintempF: string
|-- avgtempC: string
|-- avgtempF: string
|-- totalSnow_cm: string
|-- sunHour: string
|-- uvIndex: string
|-- hourly: array
|    |-- element: struct
|    |    |-- time: string
|    |    |-- tempC: string
|    |    |-- tempF: string
|    |    |-- windspeedMiles: string
|    |    |-- windspeedKmph: string
|    |    |-- winddirDegree: string
|    |    |-- winddir16Point: string
|    |    |-- weatherCode: string
|    |    |-- weatherIconUrl: array
|    |    |    |-- element: struct
|    |    |    |    |-- value: string
|    |    |-- weatherDesc: array
|    |    |    |-- element: struct
|    |    |    |    |-- value: string
|    |    |-- precipMM: str

### Make reusable Callback for Spark User Defined Function to Process Various Columns

To be used to clean and aggregate nested multi-valued hourly column data

In [6]:
def process_hourly(hours, key, fn):
    nums = []
    for hr in hours:
        if hr[key]:
            try:
                num = float(hr[key])
                if pd.notnull(num):
                    nums.append(num)
            except Exception as e:
                logger.error({
                    "error": str(e),
                    "message": "error converting {} to float".format(hr[key])
                })
                raise e
    if nums:
        return float(fn(nums))

    return np.nan

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Create UDF to Calcuate Mean of Hourly Humidity Measurements

In [7]:
avg_humidity = F.udf(lambda hours: process_hourly(hours, 'humidity', np.mean), FloatType())

clean_df = raw_dyf.toDF().withColumn('avgHumidity',  avg_humidity('hourly'))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Create UDF to Calc Min of Hourly Humidity Measurements

In [8]:
min_humidity = F.udf(lambda hours: process_hourly(hours, 'humidity', min), FloatType())

clean_df = clean_df.withColumn('avgHumidity',  min_humidity('hourly'))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Create UDF to Calc Max of Hourly Humidity Measurements

In [9]:
max_humidity = F.udf(lambda hours: process_hourly(hours, 'humidity', max), FloatType())

clean_df = clean_df.withColumn('avgHumidity',  max_humidity('hourly'))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Create UDF to Calc Total Precipitation From Hourly Precip Measurements

In [10]:
total_precipMM = F.udf(lambda hours: process_hourly(hours, 'precipMM', sum), FloatType())

clean_df = clean_df.withColumn('totalPrecipMM', total_precipMM('hourly'))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Ok ... Glue / Spark is Useful Processing Nested Fields

Create remaining UDFs to process the nested hourly field and apply them then

In [11]:
avg_pressure = F.udf(lambda hours: process_hourly(hours, 'pressure', np.mean), FloatType())
min_pressure = F.udf(lambda hours: process_hourly(hours, 'pressure', min), FloatType())
max_pressure = F.udf(lambda hours: process_hourly(hours, 'pressure', max), FloatType())

clean_df = clean_df.withColumn(
            'avgPressure', avg_pressure('hourly')
        ).withColumn(
            'minPressure', min_pressure('hourly')
        ).withColumn(
            'maxPressure', max_pressure('hourly')
        )

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Convert Field Thats Should Be Numeric to Numeric Types

In [12]:
clean_df.select('maxtempF', 'mintempF', 'avgtempF', 'totalSnow_cm', 'sunHour', 'uvIndex').printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- maxtempF: string (nullable = true)
 |-- mintempF: string (nullable = true)
 |-- avgtempF: string (nullable = true)
 |-- totalSnow_cm: string (nullable = true)
 |-- sunHour: string (nullable = true)
 |-- uvIndex: string (nullable = true)

In [13]:
clean_df = clean_df.withColumn(
            'maxtempF', F.col('maxtempF').cast(FloatType())
        ).withColumn(
            'mintempF', F.col('mintempF').cast(FloatType())
        ).withColumn(
            'avgtempF', F.col('avgtempF').cast(FloatType())
        ).withColumn(
            'totalSnow_cm', F.col('totalSnow_cm').cast(FloatType())
        ).withColumn(
            'sunHour', F.col('sunHour').cast(FloatType())
        ).withColumn(
            'uvIndex', F.col('uvIndex').cast(IntegerType())
        ).withColumn(
            'date', F.col('date').cast(DateType())
        )

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Drop Columns We Are Not Interested In

In [14]:
clean_df = clean_df.drop('hourly', 'astronomy', 'mintempC', 'maxtempC', 'avgtempC')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Yay! Look at that Clean and Tidy Data

In [15]:
clean_df.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- date: date (nullable = true)
 |-- maxtempF: float (nullable = true)
 |-- mintempF: float (nullable = true)
 |-- avgtempF: float (nullable = true)
 |-- totalSnow_cm: float (nullable = true)
 |-- sunHour: float (nullable = true)
 |-- uvIndex: integer (nullable = true)
 |-- location: string (nullable = true)
 |-- year: string (nullable = true)
 |-- month: string (nullable = true)
 |-- day: string (nullable = true)
 |-- avgHumidity: float (nullable = true)
 |-- totalPrecipMM: float (nullable = true)
 |-- avgPressure: float (nullable = true)
 |-- minPressure: float (nullable = true)
 |-- maxPressure: float (nullable = true)

In [16]:
selected = [
    'date',
    'mintempF',
    'avgtempF',
    'maxtempF',
    'avgHumidity',
    'avgPressure',
    'totalPrecipMM'
]
clean_df.select(*selected).show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+--------+--------+--------+-----------+-----------+-------------+
|      date|mintempF|avgtempF|maxtempF|avgHumidity|avgPressure|totalPrecipMM|
+----------+--------+--------+--------+-----------+-----------+-------------+
|2021-01-06|    33.0|    34.0|    35.0|       99.0|  1016.2917|          9.6|
|2020-12-09|    46.0|    55.0|    63.0|       60.0|    1013.75|          0.0|
|2021-01-21|    33.0|    38.0|    43.0|       72.0|  1015.8333|          0.0|
|2021-01-05|    30.0|    37.0|    43.0|       89.0|  1014.3333|          0.7|
|2021-02-04|    21.0|    26.0|    32.0|       94.0|   1006.875|          1.0|
+----------+--------+--------+--------+-----------+-----------+-------------+
only showing top 5 rows

### Write Cleaned Data in Parquet Format Back to S3

In [17]:
clean_dyf = DynamicFrame.fromDF(
    clean_df.repartition("location", "year", "month"),
    glue_ctx,
    'cleanweather'
)

glue_ctx.purge_s3_path(output_s3_path, options={"retentionPeriod": 0})

glue_ctx.write_dynamic_frame.from_options(
    frame=clean_dyf,
    connection_type="s3",
    connection_options={"path": output_s3_path},
    format="parquet"
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<awsglue.dynamicframe.DynamicFrame object at 0x7f7bfd4cd208>