## Joined Data
This notebook the workflow of the joined dataframe. We have an airlines data table `df_airlines` with ~31 million rows. The weather data table has about ~41 million rows. 

### Challenge: 

The biggest challenge was to find a common column between the two data tables to join. The airlines data table has a weather station column, but the name of the weather station doesnot always cooresspond to the weather stations in the weather table. In this notebook, we explain each step to solve this problem.

In [0]:
blob_container = "team07" # The name of your container created in https://portal.azure.com
storage_account = "team07" # The name of your Storage account created in https://portal.azure.com
secret_scope = "team07" # The name of the scope created in your local computer using the Databricks CLI
secret_key = "team07" # The name of the secret key created in your local computer using the Databricks CLI 
blob_url = f"wasbs://{blob_container}@{storage_account}.blob.core.windows.net"
mount_path = "/mnt/mids-w261"

In [0]:
# Generates the SAS token
spark.conf.set(
  f"fs.azure.sas.{blob_container}.{storage_account}.blob.core.windows.net",
  dbutils.secrets.get(scope = secret_scope, key = secret_key)
)

In [0]:
# Source: https://ucbischool.slack.com/archives/C02C3SFLC11/p1635569501096100?thread_ts=1635526204.076500&cid=C02C3SFLC11 
# Displays what is currently in the blob
# Put this at the top of every notebook
display(dbutils.fs.ls(blob_url))

In [0]:
# Inspect the Mount's Final Project folder 
display(dbutils.fs.ls("/mnt/mids-w261/datasets_final_project"))

In [0]:
#Import necessary libraries 
import pyspark
from pyspark.sql import functions as sf
from datetime import datetime 
from pyspark.sql.functions import split
import pytz
from pyspark.sql import SparkSession
from pyspark.sql.functions import concat,col
from pyspark.sql.types import TimestampType
from pyspark.sql.types import StringType
import pandas as pd
import numpy as np
from pyspark.sql.window import Window
from pyspark.sql.types import StringType

In [0]:
# Load Data
blob_container = "team07" # The name of your container created in https://portal.azure.com
storage_account = "team07" # The name of your Storage account created in https://portal.azure.com
secret_scope = "team07" # The name of the scope created in your local computer using the Databricks CLI
secret_key = "team07" # The name of the secret key created in your local computer using the Databricks CLI 
blob_url = f"wasbs://{blob_container}@{storage_account}.blob.core.windows.net"
mount_path = "/mnt/mids-w261"

df_airlines_all = spark.read.parquet(f"{blob_url}/df_airlines_final_full_v7")
df_weather_all = spark.read.parquet(f"{blob_url}/weather_data_trimmed_full_USonly2")

display(df_airlines_all)
display(df_weather_all)

In [0]:
df_airlines_all.printSchema()

In [0]:
df_weather_all.printSchema()

The airlines column has no identifying feature that can tag each flight as unique. So we introduce row numbers that now sets each flight as a distinct one.

In [0]:
w = Window.orderBy(sf.lit('A'))
df_airlines_all = df_airlines_all.withColumn('row_num',sf.row_number().over(w))

In [0]:
#df_airlines_all.display()

We want the latitude and longitude column of the weather data table together. So we concatenate them and put it in a new column named `lat_lng`

In [0]:
df_weather_all = df_weather_all.withColumn("lat_lng", concat(df_weather_all.LATITUDE.cast(StringType()), sf.lit(" ") , df_weather_all.LONGITUDE.cast(StringType())))
#display(df_weather_all)

Taking a look at the number of unique latitude-longitude combinations we have..

In [0]:
uniq_weather_station_lat_lng_list = df_weather_all.select("lat_lng", "STATION").distinct().collect()

We now write a method that computes the distance between two latitude-longitude pairs using the `Haversine formula`.

In [0]:
import math

def deg2rad(deg):
  return deg * (math.pi/180)

def get_distance_between_2_coords(lat1, lon1, lat2, lon2):
  # dist using Haversine formula - http://en.wikipedia.org/wiki/Haversine_formula
  R = 6371 # Radius of the earth in km
  dLat = deg2rad(lat2-lat1)
  dLon = deg2rad(lon2-lon1); 
  a = math.sin(dLat/2) * math.sin(dLat/2) + math.cos(deg2rad(lat1)) * math.cos(deg2rad(lat2)) * math.sin(dLon/2) * math.sin(dLon/2)
  c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
  d = R * c # Distance in km
  return d

In the following cell, we use Google's geocode api to find the latitude and longitude of each departing airport. We map back these coordinates to a matching weather station latitude and longitude using Haversine formula. The Havesine formula is used to determine the great-circle distance between two points on a sphere given their longitudes and latitudes. Now each departing city in the weather data has a weather station from where we can extract the hourly weather data.

In [0]:
import requests

def get_matching_weather_station(dept_city):
  possible_locations = [
    loc for loc in [
        dept_city,
        dept_city.split("/")[0] + ", " + dept_city.split(",")[1] if "/" in dept_city else None,
        dept_city.split("/")[1] + ", " + dept_city.split(",")[1] if "/" in dept_city else None,
        dept_city.split(",")[0] if "," in dept_city else None,
      ] if loc is not None
  ]
  possible_locations_generator = iter(possible_locations)
  
  def _get_lat_lng(location):
    if not location:
      return None
    try:
      result = requests.get("https://maps.googleapis.com/maps/api/geocode/json?address={}&key=AIzaSyBhXMu8-J0MTRcZj5XSPeKi-tObhAzZP9w".format(location)).json()["results"]
      return result[0]["geometry"]["location"]
    except IndexError:
      # Google can't decode this address, we'll retry!
      return _get_lat_lng(next(possible_locations_generator, None))
  
  city_coords = _get_lat_lng(next(possible_locations_generator))
  
  min_dist = math.inf
  closest_station = None
  for row in uniq_weather_station_lat_lng_list:
    weather_lat, weather_lng = row["lat_lng"].split()
    dist = get_distance_between_2_coords(float(city_coords["lat"]), float(city_coords["lng"]), float(weather_lat), float(weather_lng))
    if dist < min_dist:
      min_dist = dist
      closest_station = row["STATION"]
  return closest_station

In [0]:
uniq_cities_rows = df_airlines_all.select("ORIGIN_CITY_NAME").distinct().collect()
len(uniq_cities_rows)

In [0]:
uniq_cities = [city.ORIGIN_CITY_NAME for city in uniq_cities_rows]
uniq_cities

Checking the Departing city to weather station number map.

In [0]:
city_to_station_map = {city: get_matching_weather_station(city) for city in uniq_cities}
city_to_station_map

In [0]:
lookup = udf(lambda x: city_to_station_map[x], returnType= StringType())

Adding the weather station number to the airlines data table. This column will be used to perform the join in the next steps.

In [0]:
df_airlines_all = df_airlines_all.withColumn("weather_station", lookup(df_airlines_all.ORIGIN_CITY_NAME))

### Joining Datasets

The first join condition only looks for the matching weather station from each dataframe. This is a cross join which returns the cartesian product of the two dataframes. The cross join is most pertinent in our case because we have multiple timestamps of weather data for a particular weather station and we wanted to take all of that in our joined dataframe. 

In the next part, we filter only those rows that have weather timestamps greater than 2 hours but less than 4 hours. Thus we only look at the most recent weather for each flight departure. 

The final join is an inner join which takes only the filtered rows for latest weather and joins it back with the original dataset. We have ~26 million rows in the final joined dataframe.

In [0]:
condition = [df_airlines_all.weather_station == df_weather_all.STATION]

In [0]:
df_joined = df_airlines_all.join(df_weather_all,on = condition, how="cross")
# display(df_joined)

In [0]:
df_joined.printSchema()

In [0]:
df_joined = df_joined.withColumn('time_diff', df_joined.depart_unix_timestamp - df_joined.DATE)

In [0]:
#df_joined.display()

In [0]:
df_joined = df_joined.filter((df_joined.time_diff >= 7200) & (df_joined.time_diff <= 14400))

In [0]:
df_joined_subset = df_joined.groupBy('row_num').min('time_diff')

In [0]:
df_joined_subset = df_joined_subset.withColumnRenamed("row_num","row_number")
df_joined_subset = df_joined_subset.withColumnRenamed("min(time_diff)","time_difference")
#df_joined_subset.display()

In [0]:
#df_joined_subset.count()

In [0]:
condition_for_final_join = [(df_joined_subset.row_number == df_joined.row_num) & (df_joined_subset.time_difference == df_joined.time_diff)]

In [0]:
df_joined_latest_weather = df_joined_subset.join(df_joined, on = condition_for_final_join, how = "inner")

In [0]:
#df_joined_latest_weather.count()

In [0]:
#df_joined_latest_weather.display()

At this point we wanted to create a checkpoint and save the data. We save it on the blob as a parquet file.

In [0]:
#df_joined_latest_weather.write.mode("overwrite").parquet(f"{blob_url}/joined_data_all_v1")

In [0]:
# Load Data
blob_container = "team07" # The name of your container created in https://portal.azure.com
storage_account = "team07" # The name of your Storage account created in https://portal.azure.com
secret_scope = "team07" # The name of the scope created in your local computer using the Databricks CLI
secret_key = "team07" # The name of the secret key created in your local computer using the Databricks CLI 
blob_url = f"wasbs://{blob_container}@{storage_account}.blob.core.windows.net"
mount_path = "/mnt/mids-w261"

df_joined_reload = spark.read.parquet(f"{blob_url}/joined_data_all_v1")

We faced a major problem here. The joined dataframe gets saved as a parquet file without any complaints but when we try to use the checkpoint data and reload it `here` or on `another notebook` for further analysis, the count of the data frame is drastically lower. We manage to save only ~2 million rows to the joined dataframe. For the rest of the notebook, we will use that dataframe as our complete dataset.

In [0]:
df_joined_reload.count()

In [0]:
# Load Data
blob_container = "team07" # The name of your container created in https://portal.azure.com
storage_account = "team07" # The name of your Storage account created in https://portal.azure.com
secret_scope = "team07" # The name of the scope created in your local computer using the Databricks CLI
secret_key = "team07" # The name of the secret key created in your local computer using the Databricks CLI 
blob_url = f"wasbs://{blob_container}@{storage_account}.blob.core.windows.net"
mount_path = "/mnt/mids-w261"

df_joined_v1 = spark.read.parquet(f"{blob_url}/joined_data_all_v1")

In [0]:
df_joined_v1.count()

#### EDA on df_joined_latest_weather

In [0]:
df_joined_v1_pd = df_joined_v1.toPandas()

In [0]:
df_joined_v1_pd.shape

### Missing Data

In [0]:
import matplotlib.pyplot as plt
missing_cols = df_joined_v1_pd.isnull().sum()
plot_cols = missing_cols[missing_cols>1000] # drop 0 count cols
plot_cols.sort_values(inplace=True)
plot_cols.plot.bar(figsize=(12,8))
plt.xlabel("Feature",fontsize=14)
plt.ylabel("Missing values",fontsize=14)
plt.title("Barchart of counts of missing values",fontsize=16)
plt.show()

In [0]:
df_temp =  pd.DataFrame(df_joined_v1_pd.groupby('ORIGIN_CITY_NAME').count()['Bad_Weather_Prediction'])

Airports that have more than 10000 bad weather predictions.

In [0]:
df_temp[df_temp['Bad_Weather_Prediction'] > 10000].sort_values(by=['Bad_Weather_Prediction'],ascending=False)

In [0]:
df_joined_v1_pd.describe()

In [0]:
len(df_joined_v1_pd['TAIL_NUM'].unique())

In [0]:
df_temp1 =  pd.DataFrame(df_joined_v1_pd.groupby('TAIL_NUM').count()['Bad_Weather_Prediction'])
df_temp1