# Group Project Milestone 2: Data Exploration & Initial PreProcessing

In this assignment you will need to:

1. Create a GitHub ID
2. Create a GitHub Repository (Public or Private it is up to you. In the end it will have to be Public) and add your group members as collaborators
3. Perform the data exploration step (i.e. evaluate your data, # of observations, details about your data distributions, scales, missing data, column descriptions) Note: For image data you can still describe your data by the number of classes, # of images, plot example classes of the image, size of images, are sizes uniform? Do they need to be cropped? normalized? etc.
4. Plot your data. For tabular data, you will need to run scatters, for image data, you will need to plot your example classes.
5. How will you preprocess your data? You should explain this in your README.md file and link your Jupyter notebook to it. All code and  Jupyter notebooks have be uploaded to your repo.
6. You must also include in your Jupyter Notebook, a link for data download and environment setup requirements: 


!wget !unzip like functions as well as !pip install functions for non standard libraries not available in colab are required to be in the top section of your jupyter lab notebook. Or having the data on GitHub (you will need the academic license for GitHub to do this, larger datasets will require a link to external storage).

## GitHub ID

https://github.com/SmoothData-BigBrain

## Dataset link

https://www.kaggle.com/datasets/robikscube/flight-delay-dataset-20182022

## Data Exploration

Perform the data exploration step (i.e. evaluate your data, # of observations, details about your data distributions, scales, missing data, column descriptions) Note: For image data you can still describe your data by the number of classes, # of images, plot example classes of the image, size of images, are sizes uniform? Do they need to be cropped? normalized? etc.

### Import libraries

In [None]:
import seaborn as sns
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import os
import glob

# importing util.py 
import util

from pyspark import SparkContext
from pyspark.sql import functions as F
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import col, isnan, when, count, isnull

import seaborn as sns
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

In [None]:
# home_dir = os.path.expanduser('~')
download_path = os.path.join('/workspaces/GroupProject/data/')
# home_dir = os.path.expanduser('~')
# download_path = os.path.join(home_dir, 'GroupProject/data/')
file_id = '1tch7xbFIgBtXKXa16E4QCpVKedUExfO3'  # My File ID for airlines.zip on GDrive 
util.check_and_fetch_data(file_id, download_path)

In [None]:
spark = SparkSession.builder \
    .appName("Flight Data Analysis") \
    .getOrCreate()

spark.conf.set("spark.sql.debug.maxToStringFields", 1000)
spark.sparkContext.setLogLevel("ERROR")

### Read in dataset

#### read in individual raw files

In [None]:
os.getcwd()

In [None]:

# folder_path = '~/Desktop/GroupProject/data/archive/raw'
csv_files = glob.glob(f"{download_path}/archive/raw/*.csv")
df = spark.read.csv(csv_files,
                       sep = ',',
                       inferSchema = True,
                       header = True)

#### read in combined raw files

In [None]:
df = spark.read.csv('combined_file.csv', sep = ',', inferSchema = True, header = True)

#### read in dataset column description csv

In [None]:
col_des = spark.read.csv('flights_column_des.csv', sep = ',', inferSchema = True, header = True)

## Explore Dataset

### Get dataset shape

In [None]:
# get df shape
num_rows = df.count()
num_cols = len(df.columns)
print(f"Shape of the DataFrame: ({num_rows}, {num_cols})")

In [None]:
num_entries = df.count() #29193782
#num_entries = 29193782

### Explore null values

#### Column:Null Value Counts stored as a dictionary

In [None]:
#null_counts = df.select([count(col(c)).alias(c) for c in df.columns]).collect()[0].asDict()
#print(null_counts)

In [None]:
# compute count of non-null vals for each col in df
from pyspark.sql.functions import col, sum

null_counts = df.select([sum(when(col(c).isNull(), 1).otherwise(0)).alias(c) for c in df.columns]) \
                .collect()[0] \
                .asDict()

print(null_counts)

In [None]:
# Dictionary indicates that the last several cols have all nulls, print last 10 cols of df to manually inspect if NULLS are present
df.select(df.columns[-10:]).show(5)

In [None]:
# Dictionary indicates that the first several cols have no nulls, print first 10 cols of df to manually inspect if NULLS are present
df.select(df.columns[:10]).show(5)

#### Checking dataset for columns with few nulls or no nulls

In [None]:
columns_with_few_nulls = [col_name for col_name, count_val in null_counts.items() if count_val > 0.9*num_entries]
#columns_with_no_nulls = [col_name for col_name, count_val in null_counts.items() if count_val != 0]
print(columns_with_few_nulls)
#print(columns_with_all_nulls)
#df.select(*columns_with_nulls).show()

#### Filtering dataset for columns with few nulls & viewing output

In [None]:
newdf = df.select(columns_with_few_nulls)
#newdf.select(newdf.columns[0:10]).show()

In [None]:
#newdf.select(newdf.columns[10:19]).show()

In [None]:
#newdf.select(newdf.columns[19:33]).show()

In [None]:
#newdf.select(newdf.columns[33:50]).show()

In [None]:
#newdf.select(newdf.columns[50:62]).show()

In [None]:
## Count non-nulls in a list of specified columns

#repeat_cols = ["Marketing_Airline_Network", "Operated_or_Branded_Code_Share_Partners", "IATA_Code_Marketing_Airline", "Operating_Airline ", "IATA_Code_Operating_Airline"]
#print(newdf.select([count(col(c)).alias(c) for c in repeat_cols]).collect()[0].asDict())

In [None]:
#print(newdf.select(repeat_cols).distinct().count())

In [None]:
## filter dataset with cols to keep

cols_to_keep = ["Year", "Month", "DayofMonth", "Origin", "OriginCityName", "DestCityName", "DepDelay", "ArrDelay", "Cancelled", "CRSElapsedTime", "ActualElapsedTime"]
my_df = newdf.select(cols_to_keep)
my_df.show()

#### Computing non-null counts as percentages

In [None]:
non_null_counts = df.select([count(col(c)).alias(c) for c in df.columns]).collect()[0].asDict()

# Calculate non-null percentages
non_null_percentages = {
    col_name: (count_val / num_entries) * 100
    for col_name, count_val in non_null_counts.items()
}

sorted_columns = sorted(non_null_percentages.items(), key=lambda x: x[1], reverse=True)

for col_name, pct in sorted_columns:
    print(f"{col_name}: {pct:.2f}% non-null")

#### **Discussion**

Dataset consists of columns with >90% non-null values and then it drops down to 0-17% non-null. Dataset to be used for further exploration will only include columns with >90% non-null values for more robust analysis

### Subset dataset - removing columns with <90% null values

In [None]:
columns_above_90 = [col_name for col_name, pct in non_null_percentages.items() if pct >= 90]
filtered_df = df.select(columns_above_90)
filtered_df.select(filtered_df.columns[:8]).show(5)

In [None]:
# get filtered df shape
num_rows = filtered_df.count()
num_cols = len(filtered_df.columns)
print(f"Shape of the Filtered DataFrame removing cols w/ <90% null values: ({num_rows}, {num_cols})")

In [None]:
# save filtered df to not have to redo code later
#filtered_df.coalesce(1).write.mode("overwrite").option("header", True).csv("filtered_df_temp")

# read in already filtered_df saved previously
filtered_df = spark.read.csv('part-00000-b248588c-b561-414a-ba2c-bc77825e455a-c000.csv', sep = ',', inferSchema = True, header = True)

### Remaining Column Descriptions

In [None]:
col_des.count() # count original col number

In [None]:
# get all cols in filtered_df
filtered_cols = filtered_df.columns 

# remove any white space
filtered_cols = [str(c).strip() for c in filtered_cols]

# subset column description dataframe for only columns in filtered dataset
filtered_col_des = col_des.filter(col('column').isin(filtered_cols))

In [None]:
# check df was filtered correctly, length & row count should match
print(len(filtered_cols))
print(filtered_col_des.count())

In [None]:
# View all column descriptions in filtered dataframe

filtered_col_des.show(n=filtered_col_des.count(), truncate=False)

### Explore Dataset Statistics & Distributions

In [None]:
# get data type for each column
for name, dtype in filtered_df.dtypes:
    print(f"{name}: {dtype}")

In [None]:
non_string_cols = [col_name for col_name, dtype in filtered_df.dtypes if dtype != 'string']

In [None]:
# subset column description dataframe for only non-string
non_string_col_des = filtered_col_des.filter(col('column').isin(non_string_cols))
non_string_col_des.show(n=non_string_col_des.count(), truncate=False)

### Discussion on skewed data distributions

When taking a look at the columns with the most amount of skew in the data distribution, columns that are ID inidicators or Flight numbers do not make sense to further investigations of data distributions. Although these are numerical values, they represent categorical variables as opposed to continuous. 

Columns with 'ID','Number', 'Origin', 'Dest' in the column name will be removed from statistical analysis to remove these categorical variables 

In [None]:
cont_col_des = non_string_col_des.filter(
    ~non_string_col_des['column'].rlike('Dest|Origin|ID|Number|FlightDate')
)
cont_col_des.show(n=cont_col_des.count(), truncate=False)

In [None]:
# get statistics for all continuous variables
cont_cols = [row['column'] for row in cont_col_des.select('column').collect()]

describe_df = filtered_df.select(cont_cols).describe()

# compute Q1, Median, Q3 for each column
stats = {
    "25%": {},
    "50%": {},
    "75%": {}
}

for col_name in cont_cols:
    q1, median, q3 = filtered_df.approxQuantile(col_name, [0.25, 0.5, 0.75], 0.01)
    stats["25%"][col_name] = str(q1)
    stats["50%"][col_name] = str(median)
    stats["75%"][col_name] = str(q3)

# convert new rows to df rows
new_rows = [Row(summary=stat_name, **cols) for stat_name, cols in stats.items()]
quartile_df = spark.createDataFrame(new_rows)

# append the new rows to describe_df
full_summary_df = describe_df.unionByName(quartile_df)

In [None]:
# save df to not have to recalc results later
#print(os.getcwd())
full_summary_df.coalesce(1).write.mode("overwrite").option("header", True).csv("summary_output")

In [None]:
#full_summary_df = spark.read.csv('full_summary_df.csv', sep = ',', inferSchema = True, header = True)

In [None]:
full_summary_df.select(full_summary_df.columns[:6]).show(truncate=False)

In [None]:
# view df columns
full_summary_df.select(full_summary_df.columns[11:17]).show(truncate=False)

### Explore skewed data

mean > median, data is right-skewed (longer tail on the right)
median < mean, data is left-skewed (longer tail on the left)

This code is to find top 20 features with largest skews. These features will then be plotted in histograms

The purpose of doing this is to understand if there are any outliers in the dataset that may be worth removing from the dataset prior to applying ML methods

### Explore data distributions

In [None]:
# get mean and median rows as dicts
mean_row = full_summary_df.filter(col("summary") == "mean").collect()[0].asDict()
median_row = full_summary_df.filter(col("summary") == "50%").collect()[0].asDict()

# skip the 'summary' key
cols = [col for col in mean_row.keys() if col != "summary"]

# build rows of (column, absolute_diff, skew direction)
result_rows = []
for c in cols: # for each col
    mean_val = float(mean_row[c]) # get mean
    median_val = float(median_row[c]) # get median
    diff = __builtins__.abs(mean_val - median_val) # get abs difference
    skew = "right" if mean_val > median_val else "left" if mean_val < median_val else "none" # get skew direction
    result_rows.append(Row(column=c, absolute_diff=diff, skew=skew)) # aggregate

# create df
diff_df = spark.createDataFrame(result_rows)

# get top 20
top_skewed = diff_df.orderBy(col("absolute_diff").desc()).limit(20)

top_skewed.show(truncate=False)


In [None]:
# get all cols in filtered_df
skewed_cols = [row['column'] for row in top_skewed.select('column').collect()]

# remove any white space
skewed_cols = [str(c).strip() for c in skewed_cols]

# subset column description dataframe for only columns in filtered dataset
skewed_col_des = col_des.filter(col('column').isin(skewed_cols))

In [None]:
skewed_col_des.show(n=skewed_col_des.count(), truncate=False)

In [None]:
# list of columns from 'top_skewed'
columns_to_plot = [row['column'] for row in top_skewed.collect()]

# filter the columns that exist in filtered_df
valid_columns = [col for col in columns_to_plot if col in filtered_df.columns]

# plot histograms for each column
n_cols = 4  # 4 histograms per row
n_rows = (len(valid_columns) + n_cols - 1) // n_cols  # calculate num rows needed

fig, axes = plt.subplots(n_rows, n_cols, figsize=(20, n_rows * 5))

# flatten axes for easier indexing
axes = axes.flatten()

# loop through cols and plot
for i, column in enumerate(valid_columns):
    hist = filtered_df.select(column).rdd.flatMap(lambda x: x).histogram(20)  # 20 bins
    
    bin_edges, bin_counts = hist

    # plot the histogram using the bin edges and counts
    axes[i].bar(bin_edges[:-1], bin_counts, width=(bin_edges[1] - bin_edges[0]), edgecolor='black')

    # set axes & title
    axes[i].set_title(f"Histogram of {column}")
    axes[i].set_xlabel('Value')
    axes[i].set_ylabel('Frequency')

# turn off any unused subplots
for i in range(len(valid_columns), len(axes)):
    axes[i].axis('off')

plt.tight_layout()
plt.show()

### Discussion

The distance column, majority of flights in this dataset have a distance <1000 miles. With a few outliers ranging from 3000-5000 miles. 

Wheels On & Wheels Off time and CRSDepTime & DepTime columns have a few outliers at 0:00-4:00am, majority of times are listed between 5:00 & 23:59

The majority of TaxiOut and TaxiIn times are around 0 (or <50minutes). However, there are outliers sitting at ~1300 & 300 minutes respectively. 

## Which Origin Cities had the most delayed flights?

In [None]:
count_delay = my_df.select(["Origin", "DepDelay"]).groupBy("Origin")\
        .agg(count(F.when(col("DepDelay") > 0, 1)).alias("DelayCount"), 
             count(F.when(col("DepDelay") < 0, 1)).alias("EarlyCount"),
            count("*").alias("TotalCount")).orderBy(col("TotalCount").desc())
pandas_delay = count_delay.toPandas()

In [None]:
pdf = pandas_delay.copy()
pdf["OnTimeCount"] = pdf["TotalCount"] - pdf["DelayCount"] - pdf["EarlyCount"]

In [None]:
top_20 = pdf.head(20)
top_20

In [None]:
count_delay = my_df.select(["Origin", "DepDelay"]).groupBy("Origin")\
        .agg(count(F.when(col("DepDelay") > 0, 1)).alias("DelayCount"), 
             count(F.when(col("DepDelay") < 0, 1)).alias("EarlyCount"),
            count("*").alias("TotalCount")).orderBy(col("TotalCount").desc())
pandas_delay = count_delay.toPandas()# Assuming pdf has these columns: OriginCity, DelayedFlights, EarlyFlights, OnTimeFlights

# Bar positions
cities = top_20["Origin"]
x = np.arange(len(cities))

# Heights
early = top_20["EarlyCount"]
on_time = top_20["OnTimeCount"]
delayed = top_20["DelayCount"]

# Plot
plt.figure(figsize=(12, 6))
plt.bar(x, early, label="Early", color="green")
plt.bar(x, on_time, bottom=early, label="On Time", color="gray")
plt.bar(x, delayed, bottom=early + on_time, label="Delayed", color="red")

# Labels and formatting
plt.xticks(x, cities, rotation=45)
plt.ylabel("Number of Flights")
plt.title("Flight Status by Origin City (Top 20)")
plt.legend(title="Flight Status")
plt.tight_layout()
plt.show()

In [None]:
year_delay = my_df.select(["Year", "DepDelay"]).groupBy("Origin")\
        .agg(count(F.when(col("DepDelay") > 0, 1)).alias("DelayCount"), 
             count(F.when(col("DepDelay") < 0, 1)).alias("EarlyCount"),
            count("*").alias("TotalCount")).orderBy(col("TotalCount").desc())
pandas_year_delay = count_delay.toPandas()

In [None]:
cols_to_keep_2 = ["Airline", "Origin", "Dest", "ArrDelayMinutes", "DepDelayMinutes", "Distance", "OriginCityName", "DestCityName"]
df2 = newdf.select(cols_to_keep_2)

## Which routes had the most delays?

In [None]:
# group by origin and city, then calculating the total average delay between the cities
route_delays = df2.groupBy("OriginCityName", "DestCityName") \
    .agg(
        (f.avg("DepDelayMinutes") + f.avg("ArrDelayMinutes")).alias("AvgTotalDelay")
    ) \
    .orderBy(f.col("AvgTotalDelay").desc())

In [None]:
# convert to pandas
route_delays_pd = route_delays.limit(10).toPandas()

In [None]:
# combining origin and dest for visual purposes 
route_delays_pd['Route'] = route_delays_pd['OriginCityName'] + ' to ' + route_delays_pd['DestCityName']

In [None]:
# plot
plt.figure(figsize=(14, 8))
sns.barplot(x='AvgTotalDelay', y='Route', data=route_delays_pd)
plt.title('Top 10 Most Delayed Flight Routes')
plt.xlabel('Average Total Delay in Minutes')
plt.ylabel('Route (Origin to Destination)')
plt.xticks(rotation=0)
plt.show()

## Which airlines experience the most delays?

In [None]:
# combining departure delay and arrival delay to one column
df2 = df2.withColumn('TotalDelay', f.col('DepDelayMinutes') + f.col('ArrDelayMinutes'))

In [None]:
# selecting only delays that are over 0
delayed_flights = df2.filter(df2['TotalDelay'] > 0)

In [None]:
# group by airline, then calculating the total average delay
total_delay = delayed_flights.groupBy("Airline").agg(
    f.avg("TotalDelay").alias("TotalDelayMinutes")
).orderBy(f.col("TotalDelayMinutes").desc())

In [None]:
# convert to pandas
total_delay_pd = total_delay.limit(10).toPandas()

In [None]:
total_delay_pd.head(10)

In [None]:
# plot
plt.figure(figsize=(12, 8))
sns.barplot(x="TotalDelayMinutes", y="Airline", data=total_delay_pd)
plt.title('Top 10 Airlines with the Most Delay in Minutes')
plt.xlabel('Average Total Delay in Minutes')
plt.ylabel('Airline')
plt.show()

## Do flights with a longer distance have longer departure delays?

In [None]:
# convert to pandas
distance_delays_pd = df2.select("Distance", "DepDelayMinutes").toPandas()

In [None]:
# convert to numeric to prevent error
distance_delays_pd['Distance'] = pd.to_numeric(distance_delays_pd['Distance'], errors='coerce')
distance_delays_pd['DepDelayMinutes'] = pd.to_numeric(distance_delays_pd['DepDelayMinutes'], errors='coerce')

In [None]:
# removing any NaNs and 0 values
distance_delays_pd = distance_delays_pd.dropna(subset=['Distance', 'DepDelayMinutes'])
distance_delays_pd = distance_delays_pd[(distance_delays_pd['Distance'] > 0) & (distance_delays_pd['DepDelayMinutes'] > 0)]

In [None]:
# plot
plt.figure(figsize=(12, 8))
sns.scatterplot(x='Distance', y='DepDelayMinutes', data= distance_delays_pd, alpha=0.6)
plt.title('Flight Distance vs Departure Delay')
plt.xlabel('Flight Distance in Miles')
plt.ylabel('Departure Delay in Minutes')
plt.show()