# DeltaStream Silver Layer
---
Will perform the following in this stage:
- Read bronze layer data
- Process and clean Airplane data
  - Preprocess columns
  - Handle null/missing values
- Process and clean Weather data
  - Extract relavent data
- Join weather and airplane data by timestamp
- Save final data as silver delta table

In [0]:
from pyspark.sql.functions import (from_json, col, when, from_unixtime, 
                                   round, schema_of_json, lit, date_format, 
                                   row_number, abs, to_timestamp)
from pyspark.sql.types import *
from pyspark.sql.window import Window
from pyspark.sql.utils import AnalysisException

In [0]:
airplane_df = spark.sql("SELECT * FROM airplane_delta_bronze")
weather_df  = spark.sql("SELECT * FROM weather_delta_bronze")

spark.sql("DROP TABLE IF EXISTS airplane_delta_silver")
spark.sql("DROP TABLE IF EXISTS weather_delta_silver")

</br></br>
### Process Airplane Data
---

In [0]:
display(airplane_df)

In [0]:
schema = ArrayType(StringType())
airplane_df_without_str = airplane_df.withColumn("value_array", from_json(col("value"), schema))

new_airplane_df = (airplane_df_without_str.withColumn("icao24",    col("value_array")[0])
                                    .withColumn("callsign",        col("value_array")[1])
                                    .withColumn("origin_country",  col("value_array")[2])
                                    .withColumn("time_position",   col("value_array")[3])
                                    .withColumn("last_contact",    col("value_array")[4])
                                    .withColumn("longtitude",      col("value_array")[5])
                                    .withColumn("latitude",        col("value_array")[6])
                                    .withColumn("baro_altitude",   col("value_array")[7])
                                    .withColumn("on_ground",       col("value_array")[8])
                                    .withColumn("velocity",        col("value_array")[9])
                                    .withColumn("true_track",      col("value_array")[10])
                                    .withColumn("vertical_rate",   col("value_array")[11])
                                    .withColumn("sensors",         col("value_array")[12])
                                    .withColumn("geo_altitude",    col("value_array")[13])
                                    .withColumn("squawk",          col("value_array")[14])
                                    .withColumn("spi",             col("value_array")[15])
                                    .withColumn("position_source", col("value_array")[16]))

new_airplane_df = new_airplane_df.drop("value", "value_array")
new_airplane_df.createOrReplaceTempView("airplane_df")

In [0]:
display(new_airplane_df)

In [0]:
%sql
SELECT baro_altitude, vertical_rate, geo_altitude
FROM airplane_df
WHERE on_ground = 'true'

Since all the above fields are null when airplane is on the ground, can set to 0

In [0]:
final_airplane_df = (new_airplane_df.withColumn("altitude",       when(col("on_ground") == 'true', "0").otherwise(col("baro_altitude")))
                                    .withColumn("vertical_rate",  when(col("on_ground") == 'true', "0").otherwise(col("vertical_rate")))
                                    .withColumn("time_position",  from_unixtime(col("time_position"), "MM-dd-yyyy HH:mm:ss"))
                                    .withColumn("vertical_speed", round((col("vertical_rate") * 3.6) / 1.852, 2))
                                    .withColumn("speed",          round((col("velocity") * 3.6) / 1.852, 2)))

final_airplane_df = final_airplane_df.dropna(subset=["callsign"]) 
final_airplane_df = final_airplane_df.filter(col("callsign") != "")

final_airplane_df = final_airplane_df.drop("velocity", "vertical_rate", "squawk", "spi", 
                                           "position_source", "sensors", "geo_altitude", 
                                           "last_contact", "baro_altitude", "icao24")   

In [0]:
display(final_airplane_df)

</br></br>
### Process Weather Data
---

In [0]:
display(weather_df)

In [0]:
schema = schema_of_json(lit(weather_df.select("value").first()[0]))
new_weather_df = weather_df.withColumn("value_json", from_json(col("value"), schema))
display(new_weather_df)

In [0]:
final_weather_df = (new_weather_df.withColumn("timestamp",    date_format(col("value_json.data.timestamp"), "MM-dd-yyyy HH:mm:ss"))
                                .withColumn("dewpoint",       col("value_json.data.dewpoint.value"))
                                .withColumn("temperature",    col("value_json.data.temperature.value"))
                                .withColumn("visibility",     round(col("value_json.data.visibility.value") / 1000, 2))
                                .withColumn("wind_speed",     col("value_json.data.windSpeed.value"))
                                .withColumn("wind_direction", col("value_json.data.windDirection.value")))

final_weather_df = final_weather_df.drop("value", "value_json")
final_weather_df = final_weather_df.dropDuplicates()

In [0]:
display(final_weather_df)

</br></br>
### Join and Store Weather and Airplane Data
---

In [0]:
final_airplane_df = final_airplane_df.withColumn("time_position_ts", to_timestamp(col("time_position"), "MM-dd-yyyy HH:mm:ss"))
final_weather_df  = final_weather_df.withColumn("timestamp_ts", to_timestamp(col("timestamp"), "MM-dd-yyyy HH:mm:ss"))

df_joined = final_airplane_df.crossJoin(final_weather_df)
df_joined = df_joined.withColumn("time_diff_sec", abs(col("time_position_ts").cast("long") - col("timestamp_ts").cast("long")))

window = Window.partitionBy("time_position_ts").orderBy("time_diff_sec")

final_df = (df_joined.withColumn("rn", row_number().over(window))
                       .filter(col("rn") == 1)
                       .drop("rn", "time_diff_sec"))

final_df = final_df.drop("time_position_ts", "timestamp_ts")

In [0]:
display(final_df)

In [0]:
spark.sql("DROP TABLE IF EXISTS delta_silver")

In [0]:
(final_df.write.format("delta")
    .mode("overwrite")
    .option("overwriteSchema", "true") 
    .saveAsTable("delta_silver"))