In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

# Create a SparkSession
spark = SparkSession.builder.appName("App").getOrCreate()
spark.sparkContext.setLogLevel("WARN")

ModuleNotFoundError: No module named 'pyspark'

## Load CSV File with Header

In [None]:
# Load the CSV file with the first row as a header
df = spark.read.format("csv").option("delimiter", ",").option("header", "true").load("1987.csv")

# Display the columns and the first 15 rows
df.show(15, truncate=False)

In [None]:
# Iterate over all columns in the DataFrame
for column in df.columns:
    df = df.withColumn(column, when(col(column) == "NA", None).otherwise(col(column)))

df.show()

In [None]:
from pyspark.sql.functions import col

# Counts the number of null values for each column
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()

# Preprocessing
- eliminate unnecesary variables
- missing and duplicates values
- see correlation
- variable transformation
- variable creation

In [None]:
# columns to eliminate
columns = [
    "ArrTime", 
    "ActualElapsedTime", 
    "AirTime", 
    "TaxiIn", 
    "Diverted", 
    "CarrierDelay", 
    "WeatherDelay", 
    "NASDelay", 
    "SecurityDelay", 
    "LateAircraftDelay"
]

# Eliminate columns
df = df.drop(*columns)


In [None]:
# columns to eliminate
columns = [
    "Year",
    "TailNum",
    "TaxiOut",
    "Cancelled",
    "CancellationCode"  
]

# Eliminate columns
df = df.drop(*columns)

## Missing values

In [None]:
# Contar valores NA por columna
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()

#Now we check if NA stands for 0. If this value is not present, means that NA was 0.

In [None]:
from pyspark.sql import functions as F
# Filter the DataFrame to keep only rows where ArrDelay is equal to 0
filtered_df = df.filter(F.col("ArrDelay") == 0)

# Total number of rows in the DataFrame
total_rows = df.count()

# Check if there are any rows in the filtered DataFrame
if filtered_df.count() > 0:
    print("0 is present in the ArrDelay column " + str(filtered_df.count()) + " times out of " + str(total_rows) + ".")
else:
    print("0 is not present in the ArrDelay column.")

In [None]:
# Calculate the percentage of null values for each column
null_percentage = df.select([(count(when(col(c).isNull(), c)) / total_rows).alias(c) for c in df.columns])

# Show the percentage of null values for each column
null_percentage.show()

In [None]:
# Drop rows with at least one missing value

df = df.dropna()
dropped_rows = total_rows - df.count()
print("Dropped "+ str(dropped_rows)+ " rows.")

## Duplicates

In [None]:
# Check for duplicates and show the results
total_rows = df.count()
df = df.dropDuplicates()

if total_rows - df.count()  > 0:
    print("There are duplicates in the DataFrame.")
else:
    print("No duplicates found in the DataFrame.")

## Variable transformation

In [None]:
# List of columns to exclude from conversion
exclude_columns = ['UniqueCarrier', 'Origin', 'Dest']

# Convert all columns to integer type except the ones in exclude_columns
for column in df.columns:
    if column not in exclude_columns:
        df = df.withColumn(column, col(column).cast("integer"))

# Display the columns and the first 15 rows to verify the change
df.show(15, truncate=False)



In [None]:
# Print distinct values for each specified column
for column in exclude_columns:
    print(f"Distinct values in column '{column}':")
    distinct_values = df.select(column).distinct().collect()
    for value in distinct_values:
        print(value[column])
    print("\n")  # Adding a newline for better readability
    # Print the number of elements in the distinct_values list
    print(f"Number of distinct values: {len(distinct_values)}")

## Import geographic coordinates

In [None]:
# Read the CSV file
usa_airport_df = spark.read.csv("us-airports.csv", header=True)

# Select only the 'latitude_deg', 'longitude_deg', and 'iata_code' columns
usa_airport_df = usa_airport_df.select("latitude_deg", "longitude_deg", "iata_code")

usa_airport_df.show()

In [None]:
# Select and concatenate the values from 'Origin' and 'Destination' columns
origin = df.select("Origin").distinct()
destination = df.select("Dest").distinct()

all_airports = origin.distinct().union(destination)

# Get distinct values
distinct_airports = all_airports.distinct()
print(all_airports.count())



# Collect the distinct values

#Distinct airport codes in the new Dataset.
usa_iata_codes = usa_airport_df.select("iata_code").distinct()


#Show the missing values

distinct_airports.subtract(usa_iata_codes).show()


In [None]:
import re

# Function to convert DMS (degrees, minutes, seconds) to DD (decimal degrees)
def dms_to_dd(dms):
    parts = re.split('[°′″]', dms)
    degrees = float(parts[0])
    minutes = float(parts[1])
    seconds = float(parts[2])
    direction = parts[3]
    
    dd = degrees + minutes/60 + seconds/3600
    if direction in ('S', 'W'):
        dd *= -1
    return dd

# Coordinates in DMS format with corresponding  codes
coordinates = [
    ("43°08′36″N", "075°22′48″W","UCA"),
    ("18°20′14″N", "064°58′24″W","STT"),
    ("7°22′02″N", "134°32′39″E","ROR"),
    ("17°42′16″N", "64°48′06″W","STX"),
    ("9°29′56″N", "138°04′57″E","YAP"),
    ("13°29′02″N", "144°47′50″W","GUM"),
    ("15°07′08″N", "145°43′46″E","SPN"),
    ("30°12′44″N", "085°40′58″W","PFN"),
    ("18°26′22″N", "66°00′07″W","SJU")
]

# Convert the DMS coordinates to decimal degrees
airport_coordinates_dd = [(dms_to_dd(lat), dms_to_dd(lon),code) for  lat, lon ,code in coordinates]

new_airports_df = spark.createDataFrame(airport_coordinates_dd, ["latitude_deg", "longitude_deg", "iata_code"])
new_airports_df.show()

usa_airport_df = usa_airport_df.union(new_airports_df).distinct()

In [None]:
#Check that now we have coordinates for each airport
distinct_airports.subtract(usa_airport_df.select("iata_code")).show()

In [None]:
# Now we merge the two dataframes
flights_with_origin = df.join(usa_airport_df, df["Origin"] == usa_airport_df["iata_code"])
flights_with_origin = flights_with_origin.withColumnRenamed("latitude_deg", "Origin_Lat").withColumnRenamed("longitude_deg", "Origin_Long")

#flights_with_origin.show()


In [None]:
#adding destination coordinates
usa_airport_df = usa_airport_df.distinct()
flights_with_dest = flights_with_origin.join(usa_airport_df, flights_with_origin["Dest"] == usa_airport_df["iata_code"])
flights_with_dest = flights_with_dest.withColumnRenamed("latitude_deg", "Dest_Lat").withColumnRenamed("longitude_deg", "Dest_Long")

#flights_with_dest.show()


In [None]:
#Now we have the coordinates of the airports instead of the codes.

merged_df = flights_with_dest.drop("iata_code","Origin","Dest")

#merged_df.show()

## Use OneHotEncoder for UniqueCarrier

In [None]:
#Now we transform the 'UniqueCarrrier' feature in a OneHotEncoder

from pyspark.ml.feature import StringIndexer, OneHotEncoder
#from pyspark.ml import Pipeline

# Create a StringIndexer
indexer = StringIndexer(inputCol="UniqueCarrier", outputCol="UniqueCarrierIndex")

# Fit the indexer to the DataFrame and transform it
df_indexed = indexer.fit(merged_df).transform(merged_df)

# Create a OneHotEncoder
encoder = OneHotEncoder(inputCols=["UniqueCarrierIndex"], outputCols=["UniqueCarrierVec"])

# Apply the encoder to the DataFrame
df_encoded = encoder.fit(df_indexed).transform(df_indexed)
df_encoded.select("UniqueCarrier", "UniqueCarrierVec").distinct().show(truncate=False)


In [None]:
#Print Results
df_encoded.select("UniqueCarrier", "UniqueCarrierVec").distinct().show(truncate=False)

In [None]:
final_df = df_encoded.drop("UniqueCarrier")


# Convert "Origini_Lat" from string to double
final_df = final_df.withColumn("Origin_Lat", final_df["Origin_Lat"].cast("double"))

# Convert "Origini_Long" from string to double
final_df = final_df.withColumn("Origin_Long", final_df["Origin_Long"].cast("double"))

# Convert "Dest_Lat" from string to double
final_df = final_df.withColumn("Dest_Lat", final_df["Dest_Lat"].cast("double"))

# Convert "Dest_Long" from string to double
final_df = final_df.withColumn("Dest_Long", final_df["Dest_Long"].cast("double"))



## Plot to check the coordinates values

In [None]:
# Plot the coordinates to inspect data
lat_long_df = final_df.select("Origin_Lat", "Origin_Long").toPandas()

In [None]:
import matplotlib.pyplot as plt
import pandas as pd
# Taking a random sample of the data if it's too large
sample_df = lat_long_df.sample(frac=1)  # Adjust frac as needed

# Convert the 'Origin_Lat' and 'Origin_Long' columns to numeric (floats)
sample_df['Origin_Lat'] = pd.to_numeric(sample_df['Origin_Lat'], errors='coerce')
sample_df['Origin_Long'] = pd.to_numeric(sample_df['Origin_Long'], errors='coerce')

# Check the conversion
print(sample_df.dtypes)

# Now create the scatter plot
plt.figure(figsize=(10, 6))
plt.scatter(x=sample_df["Origin_Long"], y=sample_df["Origin_Lat"], alpha=1,s=2)
plt.title("Scatter Plot of Origin Coordinates")
plt.xlabel("Longitude")
plt.ylabel("Latitude")
plt.grid(True)
plt.show()

In [None]:
import sys
print(sys.executable)


In [None]:
import pandas as pd
import matplotlib.pyplot as plt

# Assuming your DataFrame is named sample_df and has been converted to numeric types

# Calculate Q1, Q3, and IQR for Latitude
Q1_lat = sample_df['Origin_Lat'].quantile(0.25)
Q3_lat = sample_df['Origin_Lat'].quantile(0.75)
IQR_lat = Q3_lat - Q1_lat

# Calculate Q1, Q3, and IQR for Longitude
Q1_long = sample_df['Origin_Long'].quantile(0.25)
Q3_long = sample_df['Origin_Long'].quantile(0.75)
IQR_long = Q3_long - Q1_long

# Define bounds for outliers
lower_bound_lat = Q1_lat - 1.5 * IQR_lat
upper_bound_lat = Q3_lat + 1.5 * IQR_lat
lower_bound_long = Q1_long - 1.5 * IQR_long
upper_bound_long = Q3_long + 1.5 * IQR_long

# Filter out outliers
filtered_df = sample_df[(sample_df['Origin_Lat'] >= lower_bound_lat) & 
                        (sample_df['Origin_Lat'] <= upper_bound_lat) &
                        (sample_df['Origin_Long'] >= lower_bound_long) & 
                        (sample_df['Origin_Long'] <= upper_bound_long)]

# Now create the scatter plot with smaller points and without outliers
plt.figure(figsize=(10, 6))
plt.scatter(x=filtered_df["Origin_Long"], y=filtered_df["Origin_Lat"], alpha=0.1, s=2)
plt.title("Scatter Plot of Origin Coordinates (Outliers Removed)")
plt.xlabel("Longitude")
plt.ylabel("Latitude")
plt.grid(True)
plt.show()


## Correlation

In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.stat import Correlation

# Select the numerical columns you want to include in the correlation analysis
numerical_cols = [col_name for col_name, data_type in final_df.dtypes if data_type == 'int' or data_type == 'double']



# Create a vector assembler to assemble the features
assembler = VectorAssembler(inputCols=numerical_cols, outputCol="features")
assembled_df = assembler.transform(final_df)

# Calculate the correlation matrix
corr_matrix = Correlation.corr(assembled_df, "features").head()[0]


In [None]:
import seaborn as sns
import matplotlib.pyplot as plt
import pandas as pd

# Convert the correlation matrix to a Pandas DataFrame for visualization
corr_df = pd.DataFrame(corr_matrix.toArray(), columns=numerical_cols, index=numerical_cols)

# Create a heatmap
plt.figure(figsize=(10, 8))
sns.heatmap(corr_df, annot=True, cmap='coolwarm', fmt=".2f")
plt.title("Correlation Heatmap")
plt.show()


In [None]:
#We drop some columns that are higly correlated with another

final_df = final_df.drop("CRSElapsedTime","CRSElapsedTime","CRSArrTime","CRSDepTime")

final_df.printSchema()

## Variable creation

In [None]:
from pyspark.sql.functions import when, col

# 1. Define classes based on ArrDelay
# Categorize flights as "Not Late" (0) if ArrDelay <= 0, and "Late" (1) otherwise
final_df = final_df.withColumn("ArrDelayClass", when(final_df["ArrDelay"] <= 0, 0).otherwise(1))

# Count the frequency of each class in the 'label' column
class_counts = final_df.groupBy("ArrDelayClass").count().orderBy("ArrDelayClass")

# Show the class counts
class_counts.show()

In [None]:
# Import required functions from PySpark
from pyspark.sql import functions as F
from pyspark.sql.functions import col

# Calculate ratio
major_df = df.filter(col("ArrDelayClass") == 1)
minor_df = df.filter(col("ArrDelayClass") == 0)

# Calculate the oversampling ratio
ratio = int(major_df.count() / minor_df.count())
print(f"Oversampling ratio: {ratio}")

# Duplicate the minority rows based on the calculated ratio
oversampled_df = minor_df.withColumn("dummy", F.explode(F.array([F.lit(x) for x in range(ratio)]))).drop("dummy")

# Combine the oversampled minority rows with the major rows
combined_df = major_df.unionAll(oversampled_df)

# Count the frequency of each class in the 'ArrDelayClass' column after oversampling
class_counts = combined_df.groupBy("ArrDelayClass").count().orderBy("ArrDelayClass")

# Show the class counts
class_counts.show()


# Modeling

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler

In [None]:
df.printSchema()

In [None]:
# Assuming your target variable is "ArrDelay" and your feature columns are selected
feature_columns = ["Month", "DayofMonth", "DayOfWeek", "DepTime", "FlightNum", "DepDelay", "UniqueCarrierIndex"]
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
assembled_data = assembler.transform(final_df)

In [None]:
# Assuming your target variable is "ArrDelay" and your feature columns are selected
feature_columns = ["Month", "DayofMonth", "DayOfWeek", "DepTime", "FlightNum", "DepDelay", "Distance", "Origin_Lat", "Origin_Long", "Dest_Lat", "Dest_Long", "UniqueCarrierIndex"]
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
assembled_data = assembler.transform(final_df)

In [None]:
train_data, test_data = assembled_data.randomSplit([0.7, 0.3], seed=123)

In [None]:
lr = LinearRegression(featuresCol="features", labelCol="ArrDelay",regParam=0.01)
lr_model = lr.fit(train_data)


In [None]:
test_results = lr_model.evaluate(test_data)

# Print evaluation metrics
print("Root Mean Squared Error (RMSE):", test_results.rootMeanSquaredError)
print("R-squared (R2):", test_results.r2)

In [None]:
# RandomForestClassifier

# set rf model
rf = RandomForestClassifier(labelCol="label", featuresCol="features")

# instantiate pipeline
pipeline = Pipeline(stages=[assembled_data, rf])

# train model
model_rf = pipeline.fit(train_data)

# create prediction column on test data
results = model_rf.transform(test_data)

# evaluate results
# Define the hyperparameter grid
paramGrid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [10, 20, 30]) \
    .addGrid(rf.maxDepth, [5, 10, 15]) \
    .build()

# Create the cross-validator
cross_validator = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=MulticlassClassificationEvaluator(labelCol="label", metricName="accuracy"),
                          numFolds=5, seed=42)

# Train the model with the best hyperparameters
cv_model = cross_validator.fit(train_data)

best_rf_model = cv_model.bestModel.stages[-1]
importances = best_rf_model.featureImportances


print("Feature Importances:")
for feature, importance in zip(feature_columns, importances):
    print(f"{feature}: {importance:.4f}")

# Make predictions on the test data
predictions = cv_model.transform(test_data)

evaluator = MulticlassClassificationEvaluator(labelCol="label", metricName="accuracy")

# Evaluate the model
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = {:.2f}".format(accuracy))
    

# Validation

##Close the context

In [None]:
spark.stop()