# PLEASE CLONE THIS NOTEBOOK INTO YOUR PERSONAL FOLDER
# DO NOT RUN CODE IN THE SHARED FOLDER
# THERE IS A 2 POINT DEDUCTION IF YOU RUN ANYTHING IN THE SHARED FOLDER. THANKS!

In [0]:
# The following blob storage is accessible to team members only (read and write)
# access key is valid til TTL
# after that you will need to create a new SAS key and authenticate access again via DataBrick command line
blob_container = "261storagecontainer"  # The name of your container created in https://portal.azure.com
storage_account = "261storage"  # The name of your Storage account created in https://portal.azure.com
secret_scope = "261_team_6_1_spring24_scope"  # The name of the scope created in your local computer using the Databricks CLI
secret_key = "team_6_1_key"  # The name of the secret key created in your local computer using the Databricks CLI
team_blob_url = f"wasbs://{blob_container}@{storage_account}.blob.core.windows.net"  # points to the root of your team storage bucket


# the 261 course blob storage is mounted here.
mids261_mount_path = "/mnt/mids-w261"

# SAS Token: Grant the team limited access to Azure Storage resources
spark.conf.set(
    f"fs.azure.sas.{blob_container}.{storage_account}.blob.core.windows.net",
    dbutils.secrets.get(scope=secret_scope, key=secret_key),
)
import pandas as pd

pdf = pd.DataFrame([[1, 2, 3, "Jane"], [2, 2, 2, None], [12, 12, 12, "John"]], columns=["x", "y", "z", "a_string"])
df = spark.createDataFrame(pdf)  # Create a Spark dataframe from a pandas DF

# The following can write the dataframe to the team's Cloud Storage
# Navigate back to your Storage account in https://portal.azure.com, to inspect the partitions/files.
df.write.mode("overwrite").parquet(f"{team_blob_url}/temp")

# see what's in the blob storage root folder
display(dbutils.fs.ls(f"{team_blob_url}"))

In [0]:
from pyspark.sql.functions import col

print("Welcome to the W261 final project!")


# Know your mount
Here is the mounting for this class, your source for the original data! Remember, you only have Read access, not Write! Also, become familiar with `dbutils` the equivalent of `gcp` in DataProc

In [0]:
data_BASE_DIR = "dbfs:/mnt/mids-w261/"
display(dbutils.fs.ls(f"{data_BASE_DIR}"))

In [0]:
dbutils.fs.help()

# Data for the Project

For the project you will have 4 sources of data:

1. Airlines Data: This is the raw data of flights information. You have 3 months, 6 months, 1 year, and full data from 2015 to 2019. Remember the maxima: "Test, Test, Test", so a lot of testing in smaller samples before scaling up! Location of the data? `dbfs:/mnt/mids-w261/datasets_final_project_2022/parquet_airlines_data/`, `dbfs:/mnt/mids-w261/datasets_final_project_2022/parquet_airlines_data_1y/`, etc. (Below the dbutils to get the folders)
2. Weather Data: Raw data for weather information. Same as before, we are sharing 3 months, 6 months, 1 year
3. Stations data: Extra information of the location of the different weather stations. Location `dbfs:/mnt/mids-w261/datasets_final_project_2022/stations_data/stations_with_neighbors.parquet/`
4. OTPW Data: This is our joined data (We joined Airlines and Weather). This is the main dataset for your project, the previous 3 are given for reference. You can attempt your own join for Extra Credit. Location `dbfs:/mnt/mids-w261/OTPW_60M/` and more, several samples are given!

In [0]:
# Airline Data
df_flights = spark.read.parquet(f"dbfs:/mnt/mids-w261/datasets_final_project_2022/parquet_airlines_data_3m/")
display(df_flights)

In [0]:
# Weather data
df_weather = spark.read.parquet(f"dbfs:/mnt/mids-w261/datasets_final_project_2022/parquet_weather_data_3m/")
display(df_weather)

In [0]:
# Stations data
df_stations = spark.read.parquet(f"dbfs:/mnt/mids-w261/datasets_final_project_2022/stations_data/stations_with_neighbors.parquet/")
display(df_stations)

In [0]:
# OTPW
df_otpw = spark.read.format("csv").option("header", "true").load(f"dbfs:/mnt/mids-w261/OTPW_3M_2015.csv")
display(df_otpw)


# Example of EDA

In [0]:
import matplotlib.pyplot as plt
import pyspark.sql.functions as F

df_weather = spark.read.parquet(
    f"{data_BASE_DIR}datasets_final_project_2022/parquet_weather_data_3m/"
)

# Grouping and aggregation for df_stations
grouped_stations = (
    df_stations.groupBy("neighbor_id")
    .agg(
        F.avg("distance_to_neighbor").alias("avg_distance_to_neighbor"),
    )
    .orderBy("avg_distance_to_neighbor")
)

display(grouped_stations)

# Grouping and aggregation for df_flights
grouped_flights = df_flights.groupBy("OP_UNIQUE_CARRIER").agg(
    F.avg("DEP_DELAY").alias("Avg_DEP_DELAY"),
    F.avg("ARR_DELAY").alias("Avg_ARR_DELAY"),
    F.avg("DISTANCE").alias("Avg_DISTANCE"),
)

display(grouped_flights)

# Convert columns to appropriate data types
df_weather = df_weather.withColumn(
    "HourlyPrecipitationDouble", F.col("HourlyPrecipitation").cast("double")
)
df_weather = df_weather.withColumn(
    "HourlyVisibilityDouble", F.col("HourlyVisibility").cast("double")
)
df_weather = df_weather.withColumn(
    "HourlyWindSpeedDouble", F.col("HourlyWindSpeed").cast("double")
).filter(col("HourlyWindSpeedDouble") < 2000)

# Overlayed boxplots for df_weather
weather_cols = [
    "HourlyPrecipitationDouble",
    "HourlyVisibilityDouble",
    "HourlyWindSpeedDouble",
]
weather_data = df_weather.select(*weather_cols).toPandas()

plt.figure(figsize=(10, 6))
weather_data.boxplot(column=weather_cols)
plt.title("Boxplots of Weather Variables")
plt.xlabel("Weather Variables")
plt.ylabel("Values")
plt.xticks(rotation=45)
plt.show()

# Pipeline Steps For Classification Problem

These are the "normal" steps for a Classification Pipeline! Of course, you can try more!

## 1. Data cleaning and preprocessing

* Remove outliers or missing values
* Encode categorical features
* Scale numerical features

## 2. Feature selection

* Select the most important features for the model
* Use univariate feature selection, recursive feature elimination, or random forest feature importance

## 3. Model training

* Train a machine learning model to predict delays more than 15 minutes
* Use logistic regression, decision trees, random forests, or support vector machines

## 4. Model evaluation

* Evaluate the performance of the trained model on a holdout dataset
* Use accuracy, precision, recall, or F1 score

## 5. Model deployment

* Deploy the trained model to a production environment
* Deploy the model as a web service or as a mobile app

## Tools

* Spark's MLlib and SparkML libraries
* These libraries have parallelized methods for data cleaning and preprocessing, feature selection, model training, model evaluation, and model deployment which we will utilize for this classification problem.
