## Overview

This notebook is intended to be used as teaching material for those who want initialy exposure to data engineering and machine learning. We go through basic data ingestion, pre-processing, organizing or data in the medallion lakeouse architecture (bronze, silverm, and gold layers), and some very basic machine learning.

##Read in data files

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [0]:
# CSV options for reading in csv files
file_type = "csv"
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# read list of airports data into a dataframe 
airports = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load("/FileStore/tables/airports_list.csv")

# read airport weather data into a dataframe 
airport_weather = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load("/FileStore/tables/airport_weather_2019.csv")



In [0]:
display(airports.sample(fraction=0.1))

ORIGIN_AIRPORT_ID,DISPLAY_AIRPORT_NAME,ORIGIN_CITY_NAME,NAME
11259,Dallas Love Field,"Dallas, TX","DALLAS FAA AIRPORT, TX US"
12478,John F. Kennedy International,"New York, NY","LAGUARDIA AIRPORT, NY US"
12954,Long Beach Daugherty Field,"Long Beach, CA","LOS ANGELES INTERNATIONAL AIRPORT, CA US"
10693,Myrtle Beach International,"Myrtle Beach, SC","NORTH MYRTLE BEACH, SC US"
14831,San Jose International,"San Jose, CA","SAN JOSE INTERNATIONAL AIRPORT, CA US"
11292,Stapleton International,"Denver, CO","DENVER INTERNATIONAL AIRPORT, CO US"
13851,Will Rogers World,"Oklahoma City, OK","OKLAHOMA CITY WILL ROGERS WORLD AIRPORT, OK US"


In [0]:
display(airport_weather.sample(fraction=0.1))

STATION,NAME,DATE,AWND,PGTM,PRCP,SNOW,SNWD,TAVG,TMAX,TMIN,WDF2,WDF5,WSF2,WSF5,WT01,WT02,WT03,WT04,WT05,WT06,WT07,WT08,WT09,WESD,WT10,PSUN,TSUN,SN32,SX32,TOBS,WT11
USW00013874,"ATLANTA HARTSFIELD JACKSON INTERNATIONAL AIRPORT, GA US",1/17/2019,4.03,,0.1,0.0,0.0,42.0,50.0,35.0,180.0,180.0,8.9,13.0,1.0,,1.0,,,,,1.0,,,,,,,,,
USW00013874,"ATLANTA HARTSFIELD JACKSON INTERNATIONAL AIRPORT, GA US",1/18/2019,2.91,,0.0,0.0,0.0,50.0,58.0,47.0,220.0,220.0,8.1,10.1,1.0,,,,,,,,,,,,,,,,
USW00013874,"ATLANTA HARTSFIELD JACKSON INTERNATIONAL AIRPORT, GA US",2/3/2019,4.47,,0.04,0.0,0.0,58.0,65.0,50.0,90.0,80.0,12.1,17.0,,,,,,,,,,,,,,,,,
USW00013874,"ATLANTA HARTSFIELD JACKSON INTERNATIONAL AIRPORT, GA US",2/6/2019,7.38,,0.1,0.0,0.0,64.0,71.0,59.0,210.0,220.0,14.1,17.0,1.0,1.0,,,,,,,,,,,,,,,
USW00013874,"ATLANTA HARTSFIELD JACKSON INTERNATIONAL AIRPORT, GA US",3/4/2019,16.78,,0.0,0.0,0.0,42.0,44.0,34.0,320.0,330.0,28.0,34.0,,,,,,,,,,,,,,,,,
USW00013874,"ATLANTA HARTSFIELD JACKSON INTERNATIONAL AIRPORT, GA US",3/6/2019,11.18,,0.0,0.0,0.0,35.0,47.0,27.0,330.0,320.0,21.0,29.1,,,,,,,,,,,,,,,,,
USW00013874,"ATLANTA HARTSFIELD JACKSON INTERNATIONAL AIRPORT, GA US",3/12/2019,6.04,,0.0,0.0,0.0,57.0,74.0,46.0,40.0,40.0,12.1,14.1,,,,,,,,,,,,,,,,,
USW00013874,"ATLANTA HARTSFIELD JACKSON INTERNATIONAL AIRPORT, GA US",3/16/2019,12.75,,0.0,0.0,0.0,49.0,52.0,43.0,340.0,310.0,23.9,28.0,,,,,,,,,,,,,,,,,
USW00013874,"ATLANTA HARTSFIELD JACKSON INTERNATIONAL AIRPORT, GA US",3/22/2019,13.65,,0.0,0.0,0.0,52.0,66.0,40.0,330.0,330.0,23.9,30.0,,,,,,,,,,,,,,,,,
USW00013874,"ATLANTA HARTSFIELD JACKSON INTERNATIONAL AIRPORT, GA US",3/29/2019,4.7,,0.0,0.0,0.0,61.0,73.0,49.0,220.0,260.0,13.0,17.0,,,,,,,,,,,,,,,,,


In [0]:
airport_weather.columns

Out[5]: ['STATION',
 'NAME',
 'DATE',
 'AWND',
 'PGTM',
 'PRCP',
 'SNOW',
 'SNWD',
 'TAVG',
 'TMAX',
 'TMIN',
 'WDF2',
 'WDF5',
 'WSF2',
 'WSF5',
 'WT01',
 'WT02',
 'WT03',
 'WT04',
 'WT05',
 'WT06',
 'WT07',
 'WT08',
 'WT09',
 'WESD',
 'WT10',
 'PSUN',
 'TSUN',
 'SN32',
 'SX32',
 'TOBS',
 'WT11']

In [0]:
airport_weather.dtypes

Out[6]: [('STATION', 'string'),
 ('NAME', 'string'),
 ('DATE', 'string'),
 ('AWND', 'double'),
 ('PGTM', 'double'),
 ('PRCP', 'double'),
 ('SNOW', 'double'),
 ('SNWD', 'double'),
 ('TAVG', 'double'),
 ('TMAX', 'double'),
 ('TMIN', 'double'),
 ('WDF2', 'double'),
 ('WDF5', 'double'),
 ('WSF2', 'double'),
 ('WSF5', 'double'),
 ('WT01', 'double'),
 ('WT02', 'double'),
 ('WT03', 'double'),
 ('WT04', 'double'),
 ('WT05', 'double'),
 ('WT06', 'double'),
 ('WT07', 'double'),
 ('WT08', 'double'),
 ('WT09', 'double'),
 ('WESD', 'double'),
 ('WT10', 'double'),
 ('PSUN', 'double'),
 ('TSUN', 'double'),
 ('SN32', 'double'),
 ('SX32', 'double'),
 ('TOBS', 'double'),
 ('WT11', 'double')]

##Write raw loaded data files into Delta Tables - Bronze Layer

In a real world production environment, the multi-hop Delta Lake architecture would be achieved by writing the bronze, silver, and gold tables to established storage locations in cloud storage. For our purposes, we'll just write each layer of our architecture to seperate delta tables and identify them as bronze, silver, and gold.

In [0]:
#write raw dataframes to delta table 
airports.write.mode("overwrite").format("delta").saveAsTable("airport_data_bronze")
airport_weather.write.mode("overwrite").format("delta").saveAsTable("airport_weather_bronze")

##Silver Layer

To kick off our data cleansing and validation process, we'll first start off my reading in our bronze data tables into dataframes and then do a general check to see if duplicate rows exist in our raw dataframes.

In [0]:
# read in bronze tables 
airport_data_bronze = spark.read.table("airport_data_bronze")
airport_weather_bronze = spark.read.table("airport_weather_bronze")


In [0]:
''' Group the DataFrames by the columns in the tables and count the number of rows in each group. Then we'll filter the count generated 'count' column to include only rows with a count greater than 1 which indicates a duplicate row.'''

airport_duplicates = airport_data_bronze.groupby(airport_data_bronze.columns).count().filter(col("count") > 1)
airport_weather_duplicates = airport_weather_bronze.groupby(airport_weather_bronze.columns).count().filter(col("count") > 1)

airport_duplicates.show()
airport_weather_duplicates.show()

+-----------------+--------------------+----------------+----+-----+
|ORIGIN_AIRPORT_ID|DISPLAY_AIRPORT_NAME|ORIGIN_CITY_NAME|NAME|count|
+-----------------+--------------------+----------------+----+-----+
+-----------------+--------------------+----------------+----+-----+

+-------+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+-----+
|STATION|NAME|DATE|AWND|PGTM|PRCP|SNOW|SNWD|TAVG|TMAX|TMIN|WDF2|WDF5|WSF2|WSF5|WT01|WT02|WT03|WT04|WT05|WT06|WT07|WT08|WT09|WESD|WT10|PSUN|TSUN|SN32|SX32|TOBS|WT11|count|
+-------+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+-----+
+-------+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+-----+



Great! No duplicate rows exist in both the dataframe (don't get too used to this)

Now let's check for missing data in our tables. It's likely we'll have some missing data in our temperature and precipitation columns which we'll deal with later, for now let's ensure there's no missing data in the NAME and DATE columns.

In [0]:
airport_data_bronze.select([count(when(isnull(c), c)).alias(c) for c in airport_data_bronze.columns]).show()

airport_weather_bronze.select([count(when(isnull(c), c)).alias(c) for c in ['NAME', 'DATE']]).show()

+-----------------+--------------------+----------------+----+
|ORIGIN_AIRPORT_ID|DISPLAY_AIRPORT_NAME|ORIGIN_CITY_NAME|NAME|
+-----------------+--------------------+----------------+----+
|                0|                   0|               0|   0|
+-----------------+--------------------+----------------+----+

+----+----+
|NAME|DATE|
+----+----+
|   0|   0|
+----+----+



Now we'll perform a left join on the two dataframes so that all the data we have in the weather table that corresponds to the airports in the airports table gets connected to the airports table

In [0]:
joined_df = (airport_data_bronze
             .join(airport_weather_bronze, ['NAME'], "left_outer")
            )

In [0]:
joined_df.columns

Out[49]: ['NAME',
 'ORIGIN_AIRPORT_ID',
 'DISPLAY_AIRPORT_NAME',
 'ORIGIN_CITY_NAME',
 'STATION',
 'DATE',
 'AWND',
 'PGTM',
 'PRCP',
 'SNOW',
 'SNWD',
 'TAVG',
 'TMAX',
 'TMIN',
 'WDF2',
 'WDF5',
 'WSF2',
 'WSF5',
 'WT01',
 'WT02',
 'WT03',
 'WT04',
 'WT05',
 'WT06',
 'WT07',
 'WT08',
 'WT09',
 'WESD',
 'WT10',
 'PSUN',
 'TSUN',
 'SN32',
 'SX32',
 'TOBS',
 'WT11']

In [0]:
joined_df.select([count(when(isnull(c), c)).alias(c) for c in ['NAME',
 'ORIGIN_AIRPORT_ID',
 'DISPLAY_AIRPORT_NAME',
 'ORIGIN_CITY_NAME',
 'STATION',
 'DATE',
 'AWND',
 'PGTM',
 'PRCP',
 'SNOW',
 'SNWD',
 'TAVG',
 'TMAX',
 'TMIN']]).show()

+----+-----------------+--------------------+----------------+-------+----+----+-----+----+-----+-----+----+----+----+
|NAME|ORIGIN_AIRPORT_ID|DISPLAY_AIRPORT_NAME|ORIGIN_CITY_NAME|STATION|DATE|AWND| PGTM|PRCP| SNOW| SNWD|TAVG|TMAX|TMIN|
+----+-----------------+--------------------+----------------+-------+----+----+-----+----+-----+-----+----+----+----+
|   0|                0|                   0|               0|      1|   1|   9|31591|  22|11060|11565|7013|   6|   9|
+----+-----------------+--------------------+----------------+-------+----+----+-----+----+-----+-----+----+----+----+



In [0]:
airport_weather_bronze.select([count(when(isnull(c), c)).alias(c) for c in airport_weather.columns]).show()

+-------+----+----+----+-----+----+-----+-----+----+----+----+----+----+----+----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+
|STATION|NAME|DATE|AWND| PGTM|PRCP| SNOW| SNWD|TAVG|TMAX|TMIN|WDF2|WDF5|WSF2|WSF5| WT01| WT02| WT03| WT04| WT05| WT06| WT07| WT08| WT09| WESD| WT10| PSUN| TSUN| SN32| SX32| TOBS| WT11|
+-------+----+----+----+-----+----+-----+-----+----+----+----+----+----+----+----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+
|      0|   0|   0| 381|34737|  26|11768|12290|9933|  20|  21| 370| 481| 370| 481|24824|36865|33892|38409|38556|38269|38650|33950|38405|38669|38670|38336|38337|38310|38310|38320|38674|
+-------+----+----+----+-----+----+-----+-----+----+----+----+----+----+----+----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+



We're good to go. Let's move on to performing some transformations to our data as well as some feature engineering

##Perform necessary transformations to delta tables and write to silver layer

Transformation objectives:
* Create a new "ORIGIN_STATE" column in the airports table that indicates the state the airport is located in
* Convert "DATE" column in the weather table to a date type 
* Create a new "MONTH" column in the weather table 
* The temperature columns in the weather table are currently in Fahrenheit, convert the "TAVG", "TMAX", and "TMIN" columns to Celsius and rename them to "TAVG_C", "TMAX_C", and "TMIN_C"
* The columns "PRCP", "SNOW", and "SNWD" are in inches, convert the measurements in these columns to mm and rename them to "PRCP_MM", "SNOW_MM", and "SNWD_MM"
* Drop uncessary columns in the weather table to only include the following columns in to new transformed table: 'NAME','DATE','AWND','PRCP_MM','SNOW_MM','SNWD_MM','TAVG_C', 'TMAX_C','TMIN_C'

In [0]:
# Create new State column in airports table 
# We'll do this by extracting the last two elements of the ORIGIN_CITY_NAME column
airports_silver = airport_data_bronze.withColumn("ORIGIN_STATE", col("ORIGIN_CITY_NAME").substr(-2,2))

In [0]:
display(airports_silver)

ORIGIN_AIRPORT_ID,DISPLAY_AIRPORT_NAME,ORIGIN_CITY_NAME,NAME,ORIGIN_STATE
12992,Adams Field,"Little Rock, AR","NORTH LITTLE ROCK AIRPORT, AR US",AR
10257,Albany International,"Albany, NY","ALBANY INTERNATIONAL AIRPORT, NY US",NY
10140,Albuquerque International Sunport,"Albuquerque, NM","ALBUQUERQUE INTERNATIONAL AIRPORT, NM US",NM
10299,Anchorage International,"Anchorage, AK","ANCHORAGE TED STEVENS INTERNATIONAL AIRPORT, AK US",AK
10397,Atlanta Municipal,"Atlanta, GA","ATLANTA HARTSFIELD JACKSON INTERNATIONAL AIRPORT, GA US",GA
10423,Austin - Bergstrom International,"Austin, TX","AUSTIN BERGSTROM INTERNATIONAL AIRPORT, TX US",TX
10599,Birmingham Airport,"Birmingham, AL","BIRMINGHAM AIRPORT, AL US",AL
10529,Bradley International,"Hartford, CT","HARTFORD BRADLEY INTERNATIONAL AIRPORT, CT US",CT
10994,Charleston International,"Charleston, SC","CHARLESTON INTL. AIRPORT, SC US",SC
13232,Chicago Midway International,"Chicago, IL","CHICAGO OHARE INTERNATIONAL AIRPORT, IL US",IL


In [0]:
airports_silver.dtypes

Out[14]: [('ORIGIN_AIRPORT_ID', 'int'),
 ('DISPLAY_AIRPORT_NAME', 'string'),
 ('ORIGIN_CITY_NAME', 'string'),
 ('NAME', 'string'),
 ('ORIGIN_STATE', 'string')]

In [0]:
# The current format our dates are listed in the DATE column will not be accepted in the parsing function we're going to use to convert the column to date type, specifically for the current verion of Spark we're using (Spark 3.0). 
# The code below is a quick fix to alter the configuration and restore the behavior from before Spark 3.0, this allow the parsing to succeed
# NOTE: This is not best practice, but does not deviate from our intended objectives of this notebook: understanding multi-hop Delta Lake architecture
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

# Let's start by defining function for the unit conversions

# F to C
def f_to_c(F):
    C = (F - 32) / 1.8
    C = round(C, 2)
    # C = format(C, '.2f')
    return C

# inches to mm
def in_to_mm(IN):
    mm = IN * 25.4
    mm = round(mm, 2)
    # mm = format(mm, '.2f')
    return mm

# F_to_C_UDF = udf(f_to_c)
# IN_to_MM_UDF = udf(in_to_mm)

# apply the unit conversions to the respective columns in the weather table and select a subset of columns for the final table
airport_weather_silver = (airport_weather_bronze
                            .withColumn("DATE", to_date(col("DATE"), "MM/dd/yyyy"))
                            .withColumn("MONTH", month(col('DATE')))
                            .withColumn('TAVG_C', f_to_c(col('TAVG')))
                            .withColumn('TMAX_C', f_to_c(col('TMAX')))
                            .withColumn('TMIN_C', f_to_c(col('TMIN')))
                            .withColumn('PRCP_MM', in_to_mm(col('PRCP')))
                            .withColumn('SNOW_MM', in_to_mm(col('SNOW')))
                            .withColumn('SNWD_MM', in_to_mm(col('SNWD')))
                         ).select( 'NAME','DATE','MONTH','AWND','PRCP_MM','SNOW_MM','SNWD_MM','TAVG_C', 'TMAX_C','TMIN_C')

In [0]:
display(airport_weather_silver)

NAME,DATE,MONTH,AWND,PRCP_MM,SNOW_MM,SNWD_MM,TAVG_C,TMAX_C,TMIN_C
"ATLANTA HARTSFIELD JACKSON INTERNATIONAL AIRPORT, GA US",2019-01-01,1,4.7,3.56,0.0,0.0,17.78,18.89,13.89
"ATLANTA HARTSFIELD JACKSON INTERNATIONAL AIRPORT, GA US",2019-01-02,1,4.92,14.48,0.0,0.0,13.33,15.0,9.44
"ATLANTA HARTSFIELD JACKSON INTERNATIONAL AIRPORT, GA US",2019-01-03,1,5.37,3.81,0.0,0.0,11.11,12.78,10.56
"ATLANTA HARTSFIELD JACKSON INTERNATIONAL AIRPORT, GA US",2019-01-04,1,12.08,36.58,0.0,0.0,13.33,18.89,7.22
"ATLANTA HARTSFIELD JACKSON INTERNATIONAL AIRPORT, GA US",2019-01-05,1,13.42,0.0,0.0,0.0,9.44,15.0,6.67
"ATLANTA HARTSFIELD JACKSON INTERNATIONAL AIRPORT, GA US",2019-01-06,1,6.49,0.0,0.0,0.0,11.67,20.56,6.11
"ATLANTA HARTSFIELD JACKSON INTERNATIONAL AIRPORT, GA US",2019-01-07,1,5.14,0.0,0.0,0.0,12.78,20.56,7.22
"ATLANTA HARTSFIELD JACKSON INTERNATIONAL AIRPORT, GA US",2019-01-08,1,8.05,0.0,0.0,0.0,13.89,18.33,10.0
"ATLANTA HARTSFIELD JACKSON INTERNATIONAL AIRPORT, GA US",2019-01-09,1,17.45,0.0,0.0,0.0,10.0,13.33,3.89
"ATLANTA HARTSFIELD JACKSON INTERNATIONAL AIRPORT, GA US",2019-01-10,1,13.2,0.0,0.0,0.0,3.33,7.22,-1.11


Lets take a look at the dataypes of our columns after our transformation on the weather table

In [0]:
airport_weather_silver.dtypes

Out[19]: [('NAME', 'string'),
 ('DATE', 'date'),
 ('MONTH', 'int'),
 ('AWND', 'double'),
 ('PRCP_MM', 'double'),
 ('SNOW_MM', 'double'),
 ('SNWD_MM', 'double'),
 ('TAVG_C', 'double'),
 ('TMAX_C', 'double'),
 ('TMIN_C', 'double')]

Let's re-check for missing values in our transformed airport_weather_silver table, this time with including our new MONTH column in the check

In [0]:
# Count the number of null values in the NAME, DATE, and MONTH columnS
airport_weather_silver.select([count(when(isnull(c), c)).alias(c) for c in ['NAME', 'DATE', 'MONTH']]).show()

# Count the number of null or NaN values in each column
# airport_weather_silver.select([count(when(isnull(c) | isnan(c), c)).alias(c) for c in airport_weather_silver.columns]).show()


+----+-----+-----+
|NAME| DATE|MONTH|
+----+-----+-----+
|   0|22252|22252|
+----+-----+-----+



Looks like we now have some null values in our transformed dataframe. Let's inspect the dataframe to see why that may have happened.

In [0]:

display(airport_weather_silver.filter(isnull("DATE")).sample(0.2))

NAME,DATE,MONTH,AWND,PRCP_MM,SNOW_MM,SNWD_MM,TAVG_C,TMAX_C,TMIN_C
"ASPEN PITKIN CO AIRPORT SARDY FIELD, CO US",,,4.25,0.0,,,,-6.67,-23.89
"ASPEN PITKIN CO AIRPORT SARDY FIELD, CO US",,,3.8,0.0,,,,5.56,-14.44
"ASPEN PITKIN CO AIRPORT SARDY FIELD, CO US",,,6.93,0.0,,,,3.33,-12.22
"ASPEN PITKIN CO AIRPORT SARDY FIELD, CO US",,,5.14,0.0,,,,6.11,-10.56
"ASPEN PITKIN CO AIRPORT SARDY FIELD, CO US",,,6.04,0.0,,,,5.0,-14.44
"ASPEN PITKIN CO AIRPORT SARDY FIELD, CO US",,,5.82,3.3,,,,6.67,-5.56
"ASPEN PITKIN CO AIRPORT SARDY FIELD, CO US",,,8.28,0.0,,,,3.89,-15.56
"ASPEN PITKIN CO AIRPORT SARDY FIELD, CO US",,,7.16,1.27,,,,4.44,-6.11
"ASPEN PITKIN CO AIRPORT SARDY FIELD, CO US",,,4.47,0.25,,,,-5.56,-18.33
"ASPEN PITKIN CO AIRPORT SARDY FIELD, CO US",,,4.92,0.0,,,,7.78,-11.11


It seems there's a subset of aiports for which the data type conversion for the date column didn't work as intended. Let's collect those airports into a list and use it to verify what the original date column looked like for those airports

In [0]:
# Create a single column dataframe with the distinct airport names that have a null date 
null_dates_df = airport_weather_silver.filter(isnull("DATE"))
column_values = null_dates_df.select("NAME").distinct()

# Collect airport names into a list
null_date_airports = column_values.rdd.map(lambda x: x.NAME).collect()
print(null_date_airports)
# display(column_values)


['GREENSBORO AIRPORT, NC US', 'BIRMINGHAM AIRPORT, AL US', 'WICHITA COLONEL JAMES JABARA AIRPORT, KS US', 'ANCHORAGE TED STEVENS INTERNATIONAL AIRPORT, AK US', 'GREENVILLE DOWNTOWN AIRPORT, SC US', 'FAYETTEVILLE SPRINGDALE NW AR REGL AIRPORT, AR US', 'WILMINGTON INTERNATIONAL AIRPORT, NC US', 'EL PASO INTERNATIONAL AIRPORT, TX US', 'LOUISVILLE INTERNATIONAL AIRPORT, KY US', 'ALBUQUERQUE INTERNATIONAL AIRPORT, NM US', 'ASHEVILLE AIRPORT, NC US', 'ALBANY INTERNATIONAL AIRPORT, NY US', 'HARTFORD BRADLEY INTERNATIONAL AIRPORT, CT US', 'SAN JUAN L M MARIN INTERNATIONAL AIRPORT, PR US', 'HOUSTON WILLIAM P HOBBY AIRPORT, TX US', 'ASPEN PITKIN CO AIRPORT SARDY FIELD, CO US', 'LIHUE WEATHER SERVICE OFFICE AIRPORT 1020.1, HI US', 'LEXINGTON BLUEGRASS AIRPORT, KY US', 'KNOXVILLE AIRPORT, TN US', 'FRESNO YOSEMITE INTERNATIONAL, CA US', 'SAVANNAH INTERNATIONAL AIRPORT, GA US', 'GRAND RAPIDS GERALD R FORD INTERNATIONAL AIRPORT, MI US', 'DAYTON WRIGHT BROTHERS AIRPORT, OH US', 'JACKSON INTERNATIONAL 

We already know for a fact there weren't any null values in the date column in the original data. Let's check what the date values were for these airports by filtering the bronze layer of our airport weather table to only include the airports we've collected in null_dates_airports.

In [0]:
# display filtered airport_weather_bronze dataframe
display(airport_weather_bronze.filter(col("NAME").isin(null_date_airports)).sample(0.2))

STATION,NAME,DATE,AWND,PGTM,PRCP,SNOW,SNWD,TAVG,TMAX,TMIN,WDF2,WDF5,WSF2,WSF5,WT01,WT02,WT03,WT04,WT05,WT06,WT07,WT08,WT09,WESD,WT10,PSUN,TSUN,SN32,SX32,TOBS,WT11
USW00093073,"ASPEN PITKIN CO AIRPORT SARDY FIELD, CO US",2019-01-01,3.13,,0.0,,,,17.0,-8.0,320.0,300.0,8.1,13.0,1.0,,,,,,,1.0,,,,,,,,,
USW00093073,"ASPEN PITKIN CO AIRPORT SARDY FIELD, CO US",2019-01-04,4.92,,0.0,,,,37.0,6.0,170.0,170.0,12.1,14.1,,,,,,,,1.0,,,,,,,,,
USW00093073,"ASPEN PITKIN CO AIRPORT SARDY FIELD, CO US",2019-01-05,3.8,,0.0,,,,42.0,6.0,190.0,310.0,8.9,13.0,,,,,,,,,,,,,,,,,
USW00093073,"ASPEN PITKIN CO AIRPORT SARDY FIELD, CO US",2019-01-11,4.25,,0.17,,,,31.0,25.0,10.0,10.0,12.1,17.0,1.0,,,,,,,1.0,,,,,,,,,
USW00093073,"ASPEN PITKIN CO AIRPORT SARDY FIELD, CO US",2019-01-12,3.8,,0.0,,,,32.0,11.0,210.0,350.0,8.1,10.1,1.0,,,,,,,1.0,,,,,,,,,
USW00093073,"ASPEN PITKIN CO AIRPORT SARDY FIELD, CO US",2019-01-16,3.8,,0.16,,,,34.0,22.0,350.0,360.0,12.1,16.1,1.0,,,,,,,1.0,,,,,,,,,
USW00093073,"ASPEN PITKIN CO AIRPORT SARDY FIELD, CO US",2019-01-21,5.14,,0.25,,,,40.0,20.0,190.0,180.0,21.0,29.1,1.0,1.0,,,,,,1.0,,,,,,,,,
USW00093073,"ASPEN PITKIN CO AIRPORT SARDY FIELD, CO US",2019-01-27,4.25,,0.0,,,,35.0,6.0,190.0,150.0,13.0,17.0,,,,,,,,,,,,,,,,,
USW00093073,"ASPEN PITKIN CO AIRPORT SARDY FIELD, CO US",2019-01-31,6.26,,0.0,,,,38.0,2.0,220.0,220.0,15.0,16.1,,,,,,,,,,,,,,,,,
USW00093073,"ASPEN PITKIN CO AIRPORT SARDY FIELD, CO US",2019-02-01,6.04,,0.0,,,,41.0,6.0,200.0,210.0,10.1,13.0,,,,,,,,,,,,,,,,,


Looks like we have dates in two different formats in our raw data which is causing issues when converting the column to date type. Iinitally we had thought the format was MM/dd/YYY, but as we can see a subset of airports have the date format as YYYY-MM-dd.

We'll have to convert the data type for the DATE column in our silver layer conditionally based on the format the original data is in.

Let's re-run our transformation cell, but this time we'll add a conditional transformation for the DATE column. The conditional statement will change the data type of the DATE column differently depending on whether the airport NAME is part of the group of airport that have the original date format as YYYY-MM-dd.

In [0]:
# The current format our dates are listed in the DATE column will not be accepted in the parsing function we're going to use to convert the column to date type, specifically for the current verion of Spark we're using (Spark 3.0). 
# The code below is a quick fix to alter the configuration and restore the behavior from before Spark 3.0, this allow the parsing to succeed
# NOTE: This is not best practice, but does not deviate from our intended objectives of this notebook: understanding multi-hop Delta Lake architecture
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

# Let's start by defining function for the unit conversions

# F to C
def f_to_c(F):
    C = (F - 32) / 1.8
    C = round(C, 2)
    # C = format(C, '.2f')
    return C

# inches to mm
def in_to_mm(IN):
    mm = IN * 25.4
    mm = round(mm, 2)
    # mm = format(mm, '.2f')
    return mm

# F_to_C_UDF = udf(f_to_c)
# IN_to_MM_UDF = udf(in_to_mm)

# apply the unit conversions to the respective columns in the weather table and select a subset of columns for the final table
airport_weather_silver = (airport_weather
                            .withColumn("DATE", 
                                        when(col("NAME").isin(null_date_airports), to_date(col("DATE"), "yyyy-MM-dd"))
                                        .otherwise(to_date(col("DATE"), "MM/dd/yyyy")))
                            .withColumn("MONTH", month(col('DATE')))
                            .withColumn('TAVG_C', f_to_c(col('TAVG')))
                            .withColumn('TMAX_C', f_to_c(col('TMAX')))
                            .withColumn('TMIN_C', f_to_c(col('TMIN')))
                            .withColumn('PRCP_MM', in_to_mm(col('PRCP')))
                            .withColumn('SNOW_MM', in_to_mm(col('SNOW')))
                            .withColumn('SNWD_MM', in_to_mm(col('SNWD')))
                         ).select( 'NAME','DATE','MONTH','AWND','PRCP_MM','SNOW_MM','SNWD_MM','TAVG_C', 'TMAX_C','TMIN_C')

In [0]:
# Check data types
display(airport_weather_silver.dtypes)

_1,_2
NAME,string
DATE,date
MONTH,int
AWND,double
PRCP_MM,double
SNOW_MM,double
SNWD_MM,double
TAVG_C,double
TMAX_C,double
TMIN_C,double


In [0]:
# Check for null values
airport_weather_silver.select([count(when(isnull(c), c)).alias(c) for c in ['NAME', 'DATE', 'MONTH']]).show()

+----+----+-----+
|NAME|DATE|MONTH|
+----+----+-----+
|   0|   0|    0|
+----+----+-----+



Problem solved!

Now let's check the remaining columns in the airport_weather_silver dataframe for missing values

In [0]:
# Count the number of null or NaN values in each column
airport_weather_silver.select([count(when(isnull(c), c)).alias(c) for c in ['AWND','PRCP_MM','SNOW_MM','SNWD_MM','TAVG_C', 'TMAX_C','TMIN_C']]).show()

+----+-------+-------+-------+------+------+------+
|AWND|PRCP_MM|SNOW_MM|SNWD_MM|TAVG_C|TMAX_C|TMIN_C|
+----+-------+-------+-------+------+------+------+
| 381|     26|  11768|  12290|  9933|    20|    21|
+----+-------+-------+-------+------+------+------+



As expected, we have a lot of missing weather data. 

There are multiple ways of addressing missing data in big data projects, some of which are:
- Deleting rows with missing values
- Replacing missing values with a constant 
- Predicting missing values 

We'll use some of these methods to address the missing values in our columns

The quantity of missing data for columns TMAX_C, TMIN_C, and PRCP_MM is insignificant so let's go ahead and drop the rows with missing data for these columns.

In [0]:
# drop rows with null values in columns "PRCP_MM", "TMAX_C", and "TMIN_C"
airport_weather_silver_2 = airport_weather_silver.na.drop(subset=["PRCP_MM", "TMAX_C", "TMIN_C"])
airport_weather_silver_2.select([count(when(isnull(c), c)).alias(c) for c in ['AWND','PRCP_MM','SNOW_MM','SNWD_MM','TAVG_C', 'TMAX_C','TMIN_C']]).show()

+----+-------+-------+-------+------+------+------+
|AWND|PRCP_MM|SNOW_MM|SNWD_MM|TAVG_C|TMAX_C|TMIN_C|
+----+-------+-------+-------+------+------+------+
| 367|      0|  11736|  12264|  9895|     0|     0|
+----+-------+-------+-------+------+------+------+



Now in order to resolve the missing values in the AWND column, we're going to replace the missing values in that column with the average max wind speed aggregated based on the city and month. Let's start by first joining our weather dataframe with our airports dataframe so that we have access to the city column in the airports dataframe.

In [0]:
joined_df = (airport_weather_silver_2
             .join(airports_silver, ['NAME'], "left_outer")
            )

In [0]:
display(airport_weather_silver_2)

NAME,DATE,MONTH,AWND,PRCP_MM,SNOW_MM,SNWD_MM,TAVG_C,TMAX_C,TMIN_C
"ATLANTA HARTSFIELD JACKSON INTERNATIONAL AIRPORT, GA US",2019-01-01,1,4.7,3.56,0.0,0.0,17.78,18.89,13.89
"ATLANTA HARTSFIELD JACKSON INTERNATIONAL AIRPORT, GA US",2019-01-02,1,4.92,14.48,0.0,0.0,13.33,15.0,9.44
"ATLANTA HARTSFIELD JACKSON INTERNATIONAL AIRPORT, GA US",2019-01-03,1,5.37,3.81,0.0,0.0,11.11,12.78,10.56
"ATLANTA HARTSFIELD JACKSON INTERNATIONAL AIRPORT, GA US",2019-01-04,1,12.08,36.58,0.0,0.0,13.33,18.89,7.22
"ATLANTA HARTSFIELD JACKSON INTERNATIONAL AIRPORT, GA US",2019-01-05,1,13.42,0.0,0.0,0.0,9.44,15.0,6.67
"ATLANTA HARTSFIELD JACKSON INTERNATIONAL AIRPORT, GA US",2019-01-06,1,6.49,0.0,0.0,0.0,11.67,20.56,6.11
"ATLANTA HARTSFIELD JACKSON INTERNATIONAL AIRPORT, GA US",2019-01-07,1,5.14,0.0,0.0,0.0,12.78,20.56,7.22
"ATLANTA HARTSFIELD JACKSON INTERNATIONAL AIRPORT, GA US",2019-01-08,1,8.05,0.0,0.0,0.0,13.89,18.33,10.0
"ATLANTA HARTSFIELD JACKSON INTERNATIONAL AIRPORT, GA US",2019-01-09,1,17.45,0.0,0.0,0.0,10.0,13.33,3.89
"ATLANTA HARTSFIELD JACKSON INTERNATIONAL AIRPORT, GA US",2019-01-10,1,13.2,0.0,0.0,0.0,3.33,7.22,-1.11


In [0]:
display(joined_df)

NAME,DATE,MONTH,AWND,PRCP_MM,SNOW_MM,SNWD_MM,TAVG_C,TMAX_C,TMIN_C,ORIGIN_AIRPORT_ID,DISPLAY_AIRPORT_NAME,ORIGIN_CITY_NAME,ORIGIN_STATE
"ATLANTA HARTSFIELD JACKSON INTERNATIONAL AIRPORT, GA US",2019-01-01,1,4.7,3.56,0.0,0.0,17.78,18.89,13.89,10397,Atlanta Municipal,"Atlanta, GA",GA
"ATLANTA HARTSFIELD JACKSON INTERNATIONAL AIRPORT, GA US",2019-01-02,1,4.92,14.48,0.0,0.0,13.33,15.0,9.44,10397,Atlanta Municipal,"Atlanta, GA",GA
"ATLANTA HARTSFIELD JACKSON INTERNATIONAL AIRPORT, GA US",2019-01-03,1,5.37,3.81,0.0,0.0,11.11,12.78,10.56,10397,Atlanta Municipal,"Atlanta, GA",GA
"ATLANTA HARTSFIELD JACKSON INTERNATIONAL AIRPORT, GA US",2019-01-04,1,12.08,36.58,0.0,0.0,13.33,18.89,7.22,10397,Atlanta Municipal,"Atlanta, GA",GA
"ATLANTA HARTSFIELD JACKSON INTERNATIONAL AIRPORT, GA US",2019-01-05,1,13.42,0.0,0.0,0.0,9.44,15.0,6.67,10397,Atlanta Municipal,"Atlanta, GA",GA
"ATLANTA HARTSFIELD JACKSON INTERNATIONAL AIRPORT, GA US",2019-01-06,1,6.49,0.0,0.0,0.0,11.67,20.56,6.11,10397,Atlanta Municipal,"Atlanta, GA",GA
"ATLANTA HARTSFIELD JACKSON INTERNATIONAL AIRPORT, GA US",2019-01-07,1,5.14,0.0,0.0,0.0,12.78,20.56,7.22,10397,Atlanta Municipal,"Atlanta, GA",GA
"ATLANTA HARTSFIELD JACKSON INTERNATIONAL AIRPORT, GA US",2019-01-08,1,8.05,0.0,0.0,0.0,13.89,18.33,10.0,10397,Atlanta Municipal,"Atlanta, GA",GA
"ATLANTA HARTSFIELD JACKSON INTERNATIONAL AIRPORT, GA US",2019-01-09,1,17.45,0.0,0.0,0.0,10.0,13.33,3.89,10397,Atlanta Municipal,"Atlanta, GA",GA
"ATLANTA HARTSFIELD JACKSON INTERNATIONAL AIRPORT, GA US",2019-01-10,1,13.2,0.0,0.0,0.0,3.33,7.22,-1.11,10397,Atlanta Municipal,"Atlanta, GA",GA


In [0]:
joined_df.select([count(when(isnull(c), c)).alias(c) for c in joined_df.columns]).show()

+----+----+-----+----+-------+-------+-------+------+------+------+-----------------+--------------------+----------------+------------+
|NAME|DATE|MONTH|AWND|PRCP_MM|SNOW_MM|SNWD_MM|TAVG_C|TMAX_C|TMIN_C|ORIGIN_AIRPORT_ID|DISPLAY_AIRPORT_NAME|ORIGIN_CITY_NAME|ORIGIN_STATE|
+----+----+-----+----+-------+-------+-------+------+------+------+-----------------+--------------------+----------------+------------+
|   0|   0|    0| 367|      0|  14195|  14723| 10254|     0|     0|             7639|                7639|            7639|        7639|
+----+----+-----+----+-------+-------+-------+------+------+------+-----------------+--------------------+----------------+------------+



In [0]:
display(airport_weather_silver_2.select('NAME').distinct())

NAME
"GREENSBORO AIRPORT, NC US"
"BIRMINGHAM AIRPORT, AL US"
"SEATTLE TACOMA AIRPORT, WA US"
"AUSTIN BERGSTROM INTERNATIONAL AIRPORT, TX US"
"MINNEAPOLIS ST. PAUL INTERNATIONAL AIRPORT, MN US"
"WICHITA COLONEL JAMES JABARA AIRPORT, KS US"
"PITTSBURGH ALLEGHENY CO AIRPORT, PA US"
"NASHVILLE INTERNATIONAL AIRPORT, TN US"
"SAN JOSE INTERNATIONAL AIRPORT, CA US"
"ANCHORAGE TED STEVENS INTERNATIONAL AIRPORT, AK US"


In [0]:
display(airports_silver.select('NAME').distinct())

NAME
"GREENSBORO AIRPORT, NC US"
"BIRMINGHAM AIRPORT, AL US"
"SEATTLE TACOMA AIRPORT, WA US"
"AUSTIN BERGSTROM INTERNATIONAL AIRPORT, TX US"
"MINNEAPOLIS ST. PAUL INTERNATIONAL AIRPORT, MN US"
"PITTSBURGH ALLEGHENY CO AIRPORT, PA US"
"SPOKANE INTERNATIONAL AIRPORT, WA US"
"SAN JOSE INTERNATIONAL AIRPORT, CA US"
"ANCHORAGE TED STEVENS INTERNATIONAL AIRPORT, AK US"
"HONOLULU INTERNATIONAL AIRPORT, HI US"


In [0]:

# display(airport_weather_silver_2.filter(isnull("SNOW_MM") | isnull("SNWD_MM")).sample(0.2))

display(airport_weather_silver_2.filter(isnull("TAVG_C")))

NAME,DATE,MONTH,AWND,PRCP_MM,SNOW_MM,SNWD_MM,TAVG_C,TMAX_C,TMIN_C
"CINCINNATI MUNICIPAL AIRPORT LUNKEN FIELD, OH US",2019-01-01,1,5.82,0.0,,,,11.67,5.0
"CINCINNATI MUNICIPAL AIRPORT LUNKEN FIELD, OH US",2019-01-02,1,5.82,0.0,,,,6.67,3.89
"CINCINNATI MUNICIPAL AIRPORT LUNKEN FIELD, OH US",2019-01-03,1,3.8,0.0,,,,6.11,-0.56
"CINCINNATI MUNICIPAL AIRPORT LUNKEN FIELD, OH US",2019-01-04,1,5.82,6.6,,,,11.67,-1.67
"CINCINNATI MUNICIPAL AIRPORT LUNKEN FIELD, OH US",2019-01-05,1,6.93,0.0,,,,11.11,1.67
"CINCINNATI MUNICIPAL AIRPORT LUNKEN FIELD, OH US",2019-01-06,1,3.8,0.0,,,,11.67,-1.11
"CINCINNATI MUNICIPAL AIRPORT LUNKEN FIELD, OH US",2019-01-07,1,10.07,0.0,,,,16.11,6.11
"CINCINNATI MUNICIPAL AIRPORT LUNKEN FIELD, OH US",2019-01-08,1,15.43,0.0,,,,18.33,5.56
"CINCINNATI MUNICIPAL AIRPORT LUNKEN FIELD, OH US",2019-01-09,1,12.75,0.0,,,,6.11,-5.0
"CINCINNATI MUNICIPAL AIRPORT LUNKEN FIELD, OH US",2019-01-10,1,6.71,0.0,,,,-1.67,-5.0


Now let's examine the null values in the SNOW_MM and SNWD_MM columns

In [0]:
#join two dataframe
airport_weather_silver = airport_weather_silver.withColumnRenamed('NAME','NAME_DROP')

# joined_df = (airports_silver
#              .join(airport_weather_silver, airports_silver.NAME == airport_weather_silver.NAME_DROP, 'left')
#             )


# display(joined_df)

# Replace `None` with a default value (e.g. '')
default_value = ''

# Use `coalesce` to replace `None` with the default value
joined_df = (airports_silver
             .join(airport_weather_silver.withColumn('NAME_DROP', coalesce(col('NAME_DROP'), lit(default_value))), 
                   airports_silver.NAME == airport_weather_silver.NAME_DROP, 'left')
            )

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
[0;32m<command-380894863128637>[0m in [0;36m<cell line: 15>[0;34m()[0m
[1;32m     13[0m [0;34m[0m[0m
[1;32m     14[0m [0;31m# Use `coalesce` to replace `None` with the default value[0m[0;34m[0m[0;34m[0m[0;34m[0m[0m
[0;32m---> 15[0;31m joined_df = (airports_silver
[0m[1;32m     16[0m              .join(airport_weather_silver.withColumn('NAME_DROP', coalesce(col('NAME_DROP'), lit(default_value))), 
[1;32m     17[0m                    airports_silver.NAME == airport_weather_silver.NAME_DROP, 'left')

[0;32m/databricks/spark/python/pyspark/instrumentation_utils.py[0m in [0;36mwrapper[0;34m(*args, **kwargs)[0m
[1;32m     46[0m             [0mstart[0m [0;34m=[0m [0mtime[0m[0;34m.[0m[0mperf_counter[0m[0;34m([0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[1;32m     47[0m         

Our two data tables have now been transformed and prepped for further analysis. Before moving on, we'll write the transformed data files into Delta Tables - Silver Layer

In [0]:
df = spark.createDataFrame([])
df.write.format("delta").mode("overwrite").save("/user/hive/warehouse/silver")

[0;31m---------------------------------------------------------------------------[0m
[0;31mValueError[0m                                Traceback (most recent call last)
[0;32m<command-4130278207381105>[0m in [0;36m<cell line: 1>[0;34m()[0m
[0;32m----> 1[0;31m [0mdf[0m [0;34m=[0m [0mspark[0m[0;34m.[0m[0mcreateDataFrame[0m[0;34m([0m[0;34m[[0m[0;34m][0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m[1;32m      2[0m [0mdf[0m[0;34m.[0m[0mwrite[0m[0;34m.[0m[0mformat[0m[0;34m([0m[0;34m"delta"[0m[0;34m)[0m[0;34m.[0m[0mmode[0m[0;34m([0m[0;34m"overwrite"[0m[0;34m)[0m[0;34m.[0m[0msave[0m[0;34m([0m[0;34m"/user/hive/warehouse/silver"[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m

[0;32m/databricks/spark/python/pyspark/instrumentation_utils.py[0m in [0;36mwrapper[0;34m(*args, **kwargs)[0m
[1;32m     46[0m             [0mstart[0m [0;34m=[0m [0mtime[0m[0;34m.[0m[0mperf_counter[0m[0;34m([0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m


In [0]:
airport_weather_silver.write.mode("overwrite").format("delta").option("inferSchema", "true").save("/dbfs/warehouse/default.db/silver")

[0;31m---------------------------------------------------------------------------[0m
[0;31mPy4JJavaError[0m                             Traceback (most recent call last)
[0;32m<command-4130278207381106>[0m in [0;36m<cell line: 1>[0;34m()[0m
[0;32m----> 1[0;31m [0mairport_weather_silver[0m[0;34m.[0m[0mwrite[0m[0;34m.[0m[0mmode[0m[0;34m([0m[0;34m"overwrite"[0m[0;34m)[0m[0;34m.[0m[0mformat[0m[0;34m([0m[0;34m"delta"[0m[0;34m)[0m[0;34m.[0m[0moption[0m[0;34m([0m[0;34m"inferSchema"[0m[0;34m,[0m [0;34m"true"[0m[0;34m)[0m[0;34m.[0m[0msave[0m[0;34m([0m[0;34m"/dbfs/warehouse/default.db/silver"[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m
[0;32m/databricks/spark/python/pyspark/instrumentation_utils.py[0m in [0;36mwrapper[0;34m(*args, **kwargs)[0m
[1;32m     46[0m             [0mstart[0m [0;34m=[0m [0mtime[0m[0;34m.[0m[0mperf_counter[0m[0;34m([0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[1;32m     47[0m             [0;32

In [0]:
%sql
-- CREATE OR REPLACE TABLE AS
SELECT * FROM airport_weather_silver

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
[0;32m<command-4130278207381108>[0m in [0;36m<cell line: 1>[0;34m()[0m
[1;32m      5[0m     [0mdisplay[0m[0;34m([0m[0mdf[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[1;32m      6[0m     [0;32mreturn[0m [0mdf[0m[0;34m[0m[0;34m[0m[0m
[0;32m----> 7[0;31m   [0m_sqldf[0m [0;34m=[0m [0m____databricks_percent_sql[0m[0;34m([0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m[1;32m      8[0m [0;32mfinally[0m[0;34m:[0m[0;34m[0m[0;34m[0m[0m
[1;32m      9[0m   [0;32mdel[0m [0m____databricks_percent_sql[0m[0;34m[0m[0;34m[0m[0m

[0;32m<command-4130278207381108>[0m in [0;36m____databricks_percent_sql[0;34m()[0m
[1;32m      2[0m   [0;32mdef[0m [0m____databricks_percent_sql[0m[0;34m([0m[0;34m)[0m[0;34m:[0m[0;34m[0m[0;34m[0m[0m
[1;32m      3[0m     [0;32mimport

In [0]:
#write new transformed dataframes to delta table 
airports_silver.write.mode("overwrite").format("delta").saveAsTable("airport_data_silver")
airport_weather_silver.write.mode("overwrite").format("delta").saveAsTable("airport_weather_silver")

In [0]:
display(spark.catalog.listTables("default"))

[0;31m---------------------------------------------------------------------------[0m
[0;31mValueError[0m                                Traceback (most recent call last)
[0;32m<command-680632833991804>[0m in [0;36m<cell line: 1>[0;34m()[0m
[0;32m----> 1[0;31m [0mdisplay[0m[0;34m([0m[0mspark[0m[0;34m.[0m[0mcatalog[0m[0;34m.[0m[0mlistTables[0m[0;34m([0m[0;34m"default"[0m[0;34m)[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m
[0;32m/databricks/python_shell/dbruntime/display.py[0m in [0;36mdisplay[0;34m(self, input, *args, **kwargs)[0m
[1;32m     84[0m [0;34m[0m[0m
[1;32m     85[0m         [0;32melif[0m [0misinstance[0m[0;34m([0m[0minput[0m[0;34m,[0m [0mlist[0m[0;34m)[0m[0;34m:[0m[0;34m[0m[0;34m[0m[0m
[0;32m---> 86[0;31m             [0mself[0m[0;34m.[0m[0mdisplay[0m[0;34m([0m[0mself[0m[0;34m.[0m[0msparkSession[0m[0;34m.[0m[0mcreateDataFrame[0m[0;34m([0m[0minput[0m[0;34m)[0m[0;34m)[0m[0;34m[0m[0;34m

In [0]:
# Create a view or table

temp_table_name = "airport_weather_2020_csv"

df.createOrReplaceTempView(temp_table_name)

In [0]:
%sql

/* Query the created temp table in a SQL cell */

select * from `airport_weather_2020_csv`

In [0]:
# With this registered as a temp view, it will only be available to this particular notebook. If you'd like other users to be able to query this table, you can also create a table from the DataFrame.
# Once saved, this table will persist across cluster restarts as well as allow various users across different notebooks to query this data.
# To do so, choose your table name and uncomment the bottom line.

permanent_table_name = "airport_weather_2020_csv"

# df.write.format("parquet").saveAsTable(permanent_table_name)

##Recycling Bin

In [0]:
#ensure correct data type after transformating our initial table
airport_weather_silver = (airport_weather_silver
                  .withColumn("PRCP_MM", col('PRCP_MM').cast(FloatType()))
                  .withColumn("SNOW_MM", col('SNOW_MM').cast(FloatType()))
                  .withColumn("SNWD_MM", col('SNWD_MM').cast(FloatType()))
                  .withColumn("TAVG_C", col('TAVG_C').cast(FloatType()))
                  .withColumn("TMAX_C", col('TMAX_C').cast(FloatType()))
                  .withColumn("TMIN_C", col('TMIN_C').cast(FloatType()))
                 )