### Imports

In [2]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.sql.types import StringType, NumericType, IntegerType, DoubleType
from pyspark.sql.functions import col, count, isnan, when, skewness, kurtosis, lit, percent_rank, udf, mean, concat, avg, format_string, concat_ws, hour, to_timestamp
from pyspark.sql.window import Window
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import RandomForestRegressor, GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

### Session start and Data load

In [3]:
# Create a Spark session
spark = SparkSession.builder.appName("DataLoadingExample").getOrCreate()

# Get the input data location from the command line or configuration
input_data_location = "data/1987.csv"

# Load the data into a PySpark DataFrame
df = spark.read.csv(input_data_location, header=True, inferSchema=True)

your 131072x1 screen size is bogus. expect trouble
24/01/15 04:58:56 WARN Utils: Your hostname, PCAsusNathanG resolves to a loopback address: 127.0.1.1; using 172.20.198.4 instead (on interface eth0)
24/01/15 04:58:56 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/01/15 04:58:58 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

### Data Preprocessing

In [4]:
# List of columns to be removed
columns_to_remove = ['ArrTime', 'ActualElapsedTime', 'AirTime', 'TaxiIn', 'Diverted', 'CarrierDelay', 'WeatherDelay', 'NASDelay', 'SecurityDelay', 'LateAircraftDelay']


# Remove columns with only one unique value
for c in [c for c in df.columns if c not in columns_to_remove]:
    if df.select(c).distinct().count() == 1:
        print("Column '{}' has only one unique value".format(c))
        columns_to_remove.append(c)
        
# Select columns that are NOT in the 'columns_to_remove' list
df = df.select([c for c in df.columns if c not in columns_to_remove])

# Identify numerical and categorical columns
categorical_cols = ['UniqueCarrier', 'Origin', 'Dest']
numerical_cols = [x for x in df.columns if x not in categorical_cols]

target_var = 'ArrDelay'

# Remove the target variable 'ArrDelay' from the lists
if target_var in numerical_cols:
    numerical_cols.remove(target_var)
if target_var in categorical_cols:
    categorical_cols.remove(target_var)
    
# Print the lists
print("Numerical Columns:", numerical_cols)
print("Categorical Columns:", categorical_cols)


                                                                                

Column 'Year' has only one unique value


                                                                                

Column 'TailNum' has only one unique value


                                                                                

Column 'TaxiOut' has only one unique value


                                                                                

Column 'CancellationCode' has only one unique value
Numerical Columns: ['Month', 'DayofMonth', 'DayOfWeek', 'DepTime', 'CRSDepTime', 'CRSArrTime', 'FlightNum', 'CRSElapsedTime', 'DepDelay', 'Distance', 'Cancelled']
Categorical Columns: ['UniqueCarrier', 'Origin', 'Dest']


In [5]:
# Label encode categorical columns
indexers = [StringIndexer(inputCol=c, outputCol=f"{c}_encoded") for c in categorical_cols]
pipeline = Pipeline(stages=indexers)
df_encoded = pipeline.fit(df).transform(df)

# Drop the original categorical columns
df_encoded = df_encoded.drop(*categorical_cols)

# Show the DataFrame with label-encoded categorical columns
df_encoded.show()

                                                                                

+-----+----------+---------+-------+----------+----------+---------+--------------+--------+--------+--------+---------+---------------------+--------------+------------+
|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|CRSArrTime|FlightNum|CRSElapsedTime|ArrDelay|DepDelay|Distance|Cancelled|UniqueCarrier_encoded|Origin_encoded|Dest_encoded|
+-----+----------+---------+-------+----------+----------+---------+--------------+--------+--------+--------+---------+---------------------+--------------+------------+
|   10|        14|        3|    741|       730|       849|     1451|            79|      23|      11|     447|        0|                 11.0|          28.0|         5.0|
|   10|        15|        4|    729|       730|       849|     1451|            79|      14|      -1|     447|        0|                 11.0|          28.0|         5.0|
|   10|        17|        6|    741|       730|       849|     1451|            79|      29|      11|     447|        0|                 11.0|   

### Data Exploration

In [6]:
# Calculating summary statistics for numerical columns
print("Summary statistics for numerical columns :")
df_encoded.describe(numerical_cols).show()

# Analyzing frequency counts for encoded categorical columns
encoded_categorical_cols = [f"{c}_encoded" for c in categorical_cols]
for col_name in encoded_categorical_cols:
    print(f"Frequency counts for {col_name} :")
    df_encoded.groupBy(col_name).count().orderBy('count', ascending=False).show()

# Checking for missing values in each column
print("Missing values in each column:")
df_encoded.select([count(when(col(c).isNull(), c)).alias(c) for c in df_encoded.columns]).show()

Summary statistics for numerical columns :


24/01/15 05:01:17 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

+-------+------------------+------------------+------------------+------------------+------------------+------------------+-----------------+------------------+------------------+-----------------+--------------------+
|summary|             Month|        DayofMonth|         DayOfWeek|           DepTime|        CRSDepTime|        CRSArrTime|        FlightNum|    CRSElapsedTime|          DepDelay|         Distance|           Cancelled|
+-------+------------------+------------------+------------------+------------------+------------------+------------------+-----------------+------------------+------------------+-----------------+--------------------+
|  count|           1311826|           1311826|           1311826|           1311826|           1311826|           1311826|          1311826|           1311826|           1311826|          1311826|             1311826|
|   mean|10.993736211967136|15.717608890203426|3.9490427846375966|1369.2755101803905| 1361.130906080532|1491.0518346183107|6

                                                                                

+---------------------+------+
|UniqueCarrier_encoded| count|
+---------------------+------+
|                  0.0|185813|
|                  1.0|165121|
|                  2.0|152624|
|                  3.0|123002|
|                  4.0|116482|
|                  5.0|108776|
|                  6.0|108273|
|                  7.0| 94814|
|                  8.0| 69650|
|                  9.0| 61975|
|                 10.0| 45399|
|                 11.0| 41706|
|                 12.0| 21406|
|                 13.0| 16785|
+---------------------+------+

Frequency counts for Origin_encoded :


                                                                                

+--------------+-----+
|Origin_encoded|count|
+--------------+-----+
|           0.0|67216|
|           1.0|66309|
|           2.0|51860|
|           3.0|45646|
|           4.0|43376|
|           5.0|35155|
|           6.0|32097|
|           7.0|30991|
|           8.0|29848|
|           9.0|28765|
|          10.0|28596|
|          11.0|27548|
|          12.0|25250|
|          13.0|24518|
|          14.0|23108|
|          15.0|22016|
|          16.0|21566|
|          17.0|20570|
|          18.0|19239|
|          19.0|19081|
+--------------+-----+
only showing top 20 rows

Frequency counts for Dest_encoded :


                                                                                

+------------+-----+
|Dest_encoded|count|
+------------+-----+
|         0.0|67830|
|         1.0|66783|
|         2.0|52450|
|         3.0|45597|
|         4.0|44284|
|         5.0|34948|
|         6.0|32690|
|         7.0|31058|
|         8.0|30234|
|         9.0|29296|
|        10.0|28456|
|        11.0|28452|
|        12.0|25056|
|        13.0|24664|
|        14.0|23601|
|        15.0|21979|
|        16.0|21830|
|        17.0|20474|
|        18.0|19841|
|        19.0|19149|
+------------+-----+
only showing top 20 rows

Missing values in each column:




+-----+----------+---------+-------+----------+----------+---------+--------------+--------+--------+--------+---------+---------------------+--------------+------------+
|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|CRSArrTime|FlightNum|CRSElapsedTime|ArrDelay|DepDelay|Distance|Cancelled|UniqueCarrier_encoded|Origin_encoded|Dest_encoded|
+-----+----------+---------+-------+----------+----------+---------+--------------+--------+--------+--------+---------+---------------------+--------------+------------+
|    0|         0|        0|      0|         0|         0|        0|             0|       0|       0|       0|        0|                    0|             0|           0|
+-----+----------+---------+-------+----------+----------+---------+--------------+--------+--------+--------+---------+---------------------+--------------+------------+



                                                                                

In [7]:
# Calculate skewness for numerical columns
for c in numerical_cols:
    skewness_value = df_encoded.select(skewness(c)).collect()[0][0]
    print(f"Skewness of {c}: {skewness_value}")

# Calculate kurtosis for numerical columns
for c in numerical_cols:
    kurtosis_value = df_encoded.select(kurtosis(c)).collect()[0][0]
    print(f"Kurtosis of {c}: {kurtosis_value}")

                                                                                

Skewness of Month: 0.011599187660117735


                                                                                

Skewness of DayofMonth: 0.01952750811776666


                                                                                

Skewness of DayOfWeek: 0.039007798482500175


                                                                                

Skewness of DepTime: -0.025141601840601643


                                                                                

Skewness of CRSDepTime: -0.0031534485545970043


                                                                                

Skewness of CRSArrTime: -0.2653019099069996


                                                                                

Skewness of FlightNum: 1.2344446971871021


                                                                                

Skewness of CRSElapsedTime: 1.8297136226969979


                                                                                

Skewness of DepDelay: 11.014230942786806


                                                                                

Skewness of Distance: 1.9919198527415016


                                                                                

Skewness of Cancelled: 7.978479083592336


                                                                                

Kurtosis of Month: -1.5242418620462475


                                                                                

Kurtosis of DayofMonth: -1.1909841994453017


                                                                                

Kurtosis of DayOfWeek: -1.2203884956540023


                                                                                

Kurtosis of DepTime: -0.8739366046995043


                                                                                

Kurtosis of CRSDepTime: -0.9126036848150698


                                                                                

Kurtosis of CRSArrTime: -0.5098235624138683


                                                                                

Kurtosis of FlightNum: 1.7022216487505473


                                                                                

Kurtosis of CRSElapsedTime: 5.988075465930576


                                                                                

Kurtosis of DepDelay: 512.295497518333


                                                                                

Kurtosis of Distance: 5.806106453377616




Kurtosis of Cancelled: 61.656128487320885


                                                                                

- Skewness measures the asymmetry of the probability distribution of a real-valued random variable about its mean. If skewness is 0, the data are perfectly symmetrical, although it is quite unlikely for real-world data. A skewness value greater than 0 means that there is more weight in the left tail of the distribution.

- Kurtosis indicates how the tails of a distribution differ from the tails of a normal distribution. High kurtosis means that the tails are fat, and there is a sharp peak (more outliers), whereas low kurtosis indicates light tails and a flat peak (less outlier-prone).

From the results, we can see some variables with high skewness or kurtosis values, such as `Cancelled` and `Distance`, which may benefit from data transformation before being used in a machine learning model.

In [8]:
# Filter out non-numerical columns based on the actual data type
actual_numerical_cols = [f.name for f in df_encoded.schema.fields if isinstance(f.dataType, NumericType)]

# Define the bounds for the IQR
bounds = {
    c: dict(
        zip(["q1", "q3"], df_encoded.approxQuantile(c, [0.25, 0.75], 0))
    )
    for c in actual_numerical_cols
}

for c in actual_numerical_cols:
    iqr = bounds[c]['q3'] - bounds[c]['q1']
    lower_bound = bounds[c]['q1'] - (1.5 * iqr)
    upper_bound = bounds[c]['q3'] + (1.5 * iqr)
    
    print(f"Column {c}:")
    print(f"    Lower bound: {lower_bound}")
    print(f"    Upper bound: {upper_bound}")
    
    # Optional: Filter out the outliers from the DataFrame
    df_no_outliers = df_encoded.filter((col(c) >= lower_bound) & (col(c) <= upper_bound))
    
    # Optional: View the count of identified outliers
    outliers_count = df_encoded.filter((col(c) < lower_bound) | (col(c) > upper_bound)).count()
    print(f"    Identified outliers: {outliers_count}")

                                                                                

Column Month:
    Lower bound: 7.0
    Upper bound: 15.0


                                                                                

    Identified outliers: 0
Column DayofMonth:
    Lower bound: -14.5
    Upper bound: 45.5


                                                                                

    Identified outliers: 0
Column DayOfWeek:
    Lower bound: -4.0
    Upper bound: 12.0


                                                                                

    Identified outliers: 0
Column CRSDepTime:
    Lower bound: -263.0
    Upper bound: 2945.0


                                                                                

    Identified outliers: 0
Column CRSArrTime:
    Lower bound: -80.5
    Upper bound: 3107.5


                                                                                

    Identified outliers: 0
Column FlightNum:
    Lower bound: -613.5
    Upper bound: 1798.5


                                                                                

    Identified outliers: 55606
Column CRSElapsedTime:
    Lower bound: -42.0
    Upper bound: 230.0


                                                                                

    Identified outliers: 60988
Column Cancelled:
    Lower bound: 0.0
    Upper bound: 0.0


                                                                                

    Identified outliers: 19685
Column UniqueCarrier_encoded:
    Lower bound: -8.0
    Upper bound: 16.0


                                                                                

    Identified outliers: 0
Column Origin_encoded:
    Lower bound: -51.0
    Upper bound: 101.0


                                                                                

    Identified outliers: 86721
Column Dest_encoded:
    Lower bound: -51.0
    Upper bound: 101.0




    Identified outliers: 85084


                                                                                

1. **No Outliers for Some Variables:** For columns like `Month`, `DayofMonth`, `DayOfWeek`, `CRSDepTime`, `CRSArrTime`, `UniqueCarrier_encoded`, no outliers were identified based on the IQR method. This is expected for `Month`, `DayofMonth`, and `DayOfWeek` since their values fall within a specific range.

2. **High Number of Outliers for Certain Variables:** For `FlightNum`, `CRSElapsedTime`, `Cancelled`, `Origin_encoded` and `Dest_encoded`, a significant number of outliers have been identified.

### Domain Knowledge

1. **Cancelled Flights:** For the `Cancelled` variable, since these flights do not have a delay, they should be handled appropriately. One approach is to remove cancelled flights from the dataset when building a model to predict delays, as they don't contribute to the delay prediction.

2. **CRSElapsedTime Extremes:** Extreme values in `CRSElapsedTime` represent long or short flights. These should not be considered outliers in a statistical sense but rather as part of the natural variability of flight durations.

3. **Origin, Destination and Flight number:** Since the encoded values for `Origin_encoded` and `Dest_encoded` reflect actual airports and `FlightNum` give insights on specific flight lines, we'll retain them as they are. These features can be quite informative for delay predictions as some airports and specific flight might have usual higher delays. 

In [9]:
# Filterring canceled flight
# Count the number of canceled flights before filtering
cancelled_flights_count_before = df_encoded.filter(col('Cancelled') == 1).count()
print(f"Number of canceled flights before filtering: {cancelled_flights_count_before}")

# Remove canceled flights
df_not_cancelled = df_encoded.filter(col('Cancelled') == 0)

# Count the number of canceled flights after filtering
cancelled_flights_count_after = df_not_cancelled.filter(col('Cancelled') == 1).count()
print(f"Number of canceled flights after filtering: {cancelled_flights_count_after}")

# Drop the 'Cancelled' column from the DataFrame
df_encoded = df_not_cancelled.drop('Cancelled')

                                                                                

Number of canceled flights before filtering: 19685




Number of canceled flights after filtering: 0


                                                                                

### Feature Engineering

In [10]:
# UDF to classify the time of day based on DepTime in hhmm format
def get_part_of_day(deptime):
    if deptime is None:
        return None
    hour = int(deptime) // 100
    if 5 <= hour <= 11:
        return 'Morning'
    elif 12 <= hour <= 17:
        return 'Afternoon'
    elif 18 <= hour <= 22:
        return 'Evening'
    else:
        return 'Night'

part_of_day_udf = udf(get_part_of_day, StringType())

# Now, let's recast the DepTime column to IntegerType to handle cases where it's not an integer.
df_encoded = df_encoded.withColumn('DepTime', col('DepTime').cast(IntegerType()))

# Then use the UDF to create the PartOfDay column
df_encoded = df_encoded.withColumn('PartOfDay', part_of_day_udf(col('DepTime')))

# Add IsWeekend column
df_encoded = df_encoded.withColumn('IsWeekend', when(col('DayOfWeek').isin([6, 7]), 1).otherwise(0))



# Add Season column based on the month
df_encoded = df_encoded.withColumn('Season', when(col('Month').isin([12, 1, 2]), 'Winter')
                                      .when(col('Month').isin([3, 4, 5]), 'Spring')
                                      .when(col('Month').isin([6, 7, 8]), 'Summer')
                                      .otherwise('Autumn'))

# Create a 'FlightDate' key using 'Month' and 'DayofMonth' (as we only have data for 1987)
df_encoded = df_encoded.withColumn('FlightDate', 
                                   concat_ws("-", col('Month').cast(StringType()), col('DayofMonth').cast(StringType())))

# Window for calculating the daily flight volume
daily_volume_window = Window.partitionBy('FlightDate')

# Add DailyFlightVolume column (counting the number of flights per day)
df_encoded = df_encoded.withColumn('DailyFlightVolume', count('*').over(daily_volume_window))

# Show the DataFrame with new features
df_encoded.select('FlightDate', 'PartOfDay', 'IsWeekend', 'Season', 'DailyFlightVolume').show(truncate=False)

                                                                                

+----------+---------+---------+------+-----------------+
|FlightDate|PartOfDay|IsWeekend|Season|DailyFlightVolume|
+----------+---------+---------+------+-----------------+
|10-24     |Morning  |1        |Autumn|13351            |
|10-24     |Morning  |1        |Autumn|13351            |
|10-24     |Afternoon|1        |Autumn|13351            |
|10-24     |Morning  |1        |Autumn|13351            |
|10-24     |Morning  |1        |Autumn|13351            |
|10-24     |Morning  |1        |Autumn|13351            |
|10-24     |Evening  |1        |Autumn|13351            |
|10-24     |Evening  |1        |Autumn|13351            |
|10-24     |Morning  |1        |Autumn|13351            |
|10-24     |Evening  |1        |Autumn|13351            |
|10-24     |Morning  |1        |Autumn|13351            |
|10-24     |Afternoon|1        |Autumn|13351            |
|10-24     |Afternoon|1        |Autumn|13351            |
|10-24     |Afternoon|1        |Autumn|13351            |
|10-24     |Mo

In [11]:
# Convert columns to the correct data types
df_encoded = df_encoded.withColumn("DepDelay", col("DepDelay").cast(DoubleType()))
df_encoded = df_encoded.withColumn("Distance", col("Distance").cast(DoubleType()))
df_encoded = df_encoded.withColumn('ArrDelay', col('ArrDelay').cast(DoubleType()))

# Define the columns to be indexed and encoded
categorical_cols_to_encode = ['Season', 'PartOfDay']

# Create a list to hold the stages of the pipeline
stages = []

# Iterate over the columns to create indexing and encoding stages
for categorical_col in categorical_cols_to_encode:
    # Create a StringIndexer
    string_indexer = StringIndexer(inputCol=categorical_col, outputCol=categorical_col + "Index")
    
    # Create a OneHotEncoder
    encoder = OneHotEncoder(inputCols=[string_indexer.getOutputCol()], outputCols=[categorical_col + "Vec"])
    
    # Add the indexers and encoders to our pipeline stages
    stages += [string_indexer, encoder]

# Create the pipeline
pipeline = Pipeline(stages=stages)

# Fit the pipeline to the data
pipeline_model = pipeline.fit(df_encoded)

# Transform the data
df_encoded = pipeline_model.transform(df_encoded)

# Now that we have our encoded features, we can remove the original categorical columns
df_encoded = df_encoded.drop(*categorical_cols_to_encode)

# And we can also remove the intermediate index columns
for categorical_col in categorical_cols_to_encode:
    df_encoded = df_encoded.drop(categorical_col + "Index")

# Show the DataFrame with the new encoded columns
df_encoded.show(truncate=False)

[Stage 279:>                                                        (0 + 1) / 1]

+-----+----------+---------+-------+----------+----------+---------+--------------+--------+--------+--------+---------------------+--------------+------------+---------+----------+-----------------+-------------+-------------+
|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|CRSArrTime|FlightNum|CRSElapsedTime|ArrDelay|DepDelay|Distance|UniqueCarrier_encoded|Origin_encoded|Dest_encoded|IsWeekend|FlightDate|DailyFlightVolume|SeasonVec    |PartOfDayVec |
+-----+----------+---------+-------+----------+----------+---------+--------------+--------+--------+--------+---------------------+--------------+------------+---------+----------+-----------------+-------------+-------------+
|10   |24        |6        |744    |730       |849       |1451     |79            |19.0    |14.0    |447.0   |11.0                 |28.0          |5.0         |1        |10-24     |13351            |(1,[0],[1.0])|(3,[1],[1.0])|
|10   |24        |6        |929    |915       |1001      |1451     |46            |51.0 

                                                                                

In [12]:
# Generate a check for each column if there is any null value
null_checks = [count(when(col(c).isNull(), c)).alias(c) for c in df_encoded.columns]

# Apply the checks to the DataFrame
null_counts = df_encoded.select(*null_checks).collect()[0].asDict()

# Print out the counts of nulls for each column
for column, null_count in null_counts.items():
    if null_count != 0: 
        print(f"Column {column} has {null_count} null values")

                                                                                

Column ArrDelay has 3815 null values
Column Distance has 994 null values


In [13]:
total_lines = df_encoded.count()
print(f"The total number of lines in the DataFrame is: {total_lines}")



The total number of lines in the DataFrame is: 1292141


                                                                                

In [14]:
# List of columns to check for null values
columns_to_check = ['ArrDelay', 'Distance']

# Drop rows that have null values in the specified columns
df_encoded = df_encoded.na.drop(subset=columns_to_check)

new_total_lines = df_encoded.count()

# Show the number of lines remaining after removing rows with nulls in specific columns
print(f"The number of lines after removing rows with null values in specified columns: {new_total_lines}")
print(f"\nTotal number of deleted lines : {total_lines - new_total_lines}")



The number of lines after removing rows with null values in specified columns: 1287333

Total number of deleted lines : 4808


                                                                                

The dataframe is now cleaned and processed, all ready to be use by a prediction model!

### Model Training

In [15]:
# Define the features and target variable
feature_cols = ['Month', 'DayofMonth', 'DayOfWeek', 'DepTime', 'CRSDepTime', 'CRSArrTime', 'FlightNum',
                'CRSElapsedTime', 'DepDelay', 'Distance', 'UniqueCarrier_encoded', 'Origin_encoded',
                'Dest_encoded', 'PartOfDayVec', 'IsWeekend', 'SeasonVec', 'DailyFlightVolume']

target_col = 'ArrDelay'

# Assemble features into a single vector column
assembler = VectorAssembler(
    inputCols=feature_cols,
    outputCol="features",
    handleInvalid="skip"  # Skip lines with null values
)

df_assembled = assembler.transform(df_encoded)


# Select only the features and target variable
df_model_data = df_assembled.select(col("features"), col(target_col).alias("label"))

# Split the data into training and testing sets
train_data, test_data = df_model_data.randomSplit([0.8, 0.2], seed=42)

# Initialize the regression model
rf = RandomForestRegressor(featuresCol="features", labelCol="label", maxBins=250)

# Train the model
rf_model = rf.fit(train_data)

# Make predictions on the test data
rf_predictions = rf_model.transform(test_data)

24/01/15 05:07:45 WARN MemoryStore: Not enough space to cache rdd_709_3 in memory! (computed 13.7 MiB so far)
24/01/15 05:07:45 WARN MemoryStore: Not enough space to cache rdd_709_7 in memory! (computed 9.0 MiB so far)
24/01/15 05:07:45 WARN MemoryStore: Not enough space to cache rdd_709_4 in memory! (computed 13.7 MiB so far)
24/01/15 05:07:45 WARN BlockManager: Persisting block rdd_709_7 to disk instead.
24/01/15 05:07:45 WARN BlockManager: Persisting block rdd_709_4 to disk instead.
24/01/15 05:07:45 WARN BlockManager: Persisting block rdd_709_3 to disk instead.
24/01/15 05:07:45 WARN MemoryStore: Not enough space to cache rdd_709_0 in memory! (computed 13.7 MiB so far)
24/01/15 05:07:45 WARN BlockManager: Persisting block rdd_709_0 to disk instead.
24/01/15 05:07:45 WARN MemoryStore: Not enough space to cache rdd_709_5 in memory! (computed 13.7 MiB so far)
24/01/15 05:07:45 WARN BlockManager: Persisting block rdd_709_5 to disk instead.
24/01/15 05:07:45 WARN MemoryStore: Not enough

In [16]:
# Function to evaluate the model
def eval_model(predictions, metrics = ['rmse', 'mae', 'mse', 'r2']):
    for m in metrics:
        evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName=m)
        metric = evaluator.evaluate(predictions)
        print(f" {m} on test data: {metric}")

In [17]:
# RandomForestRegressor evaluation
eval_model(rf_predictions)

                                                                                

 rmse on test data: 16.1142797385221


                                                                                

 mae on test data: 8.771058928398741


                                                                                

 mse on test data: 259.6700114913439




 r2 on test data: 0.6278799780508133


                                                                                

1. **Root Mean Squared Error (RMSE):** Standard deviation of the residuals (prediction errors : lower RMSE value = better fit to the data).
Here : Model's predictions deviate from about 16 minutes from the actual delay time.

2. **Mean Absolute Error (MAE):** Average magnitude of the errors in a set of predictions, without considering their direction (Average of the absolute differences between prediction and actual observation where all individual differences have equal weight : lower MAE = better).
RF model's MAE of almost 9 indicates that, on average, the absolute error of each prediction is around 9 minutes.

3. **Mean Squared Error (MSE):** Average of the squares of the errors—that is, the average squared difference between the estimated values and the actual value (Risk metric corresponding to the expected value of the squared (quadratic) error or loss : Low MSE = good model).
RF model's MSE is about 260, which aligns with the RMSE (since RMSE is the square root of MSE).

4. **R-squared (R²):** Statistical measure that represents the proportion of the variance for the dependent variable that's explained by the independent variables in a regression model (Value range from 0 to 1 : High value = higher proportion of variance accounted for by the model). 
RF model's R² value of approximately 0.62 suggests that about 62% of the variance in your dependent variable is predictable from the independent variables.

### Hyperparameter Tunning and Crossvalidation

In [18]:
# RandomForestRegressor Hyperparameter tuning
rf_paramGrid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [10, 15, 20]) \
    .addGrid(rf.maxDepth, [5, 8, 12]) \
    .addGrid(rf.maxBins, [250, 300, 350]) \
    .build()

# CrossValidator
crossval_rf = CrossValidator(estimator=rf,
                             estimatorParamMaps=rf_paramGrid,
                             evaluator=RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse"),
                             numFolds=3)

# Fit the model using CrossValidator
cv_rf_model = crossval_rf.fit(train_data)
rf_best_model = cv_rf_model.bestModel
best_rf_predictions = rf_best_model.transform(test_data)

24/01/15 05:13:31 WARN DAGScheduler: Broadcasting large task binary with size 1086.0 KiB
24/01/15 05:13:45 WARN DAGScheduler: Broadcasting large task binary with size 1087.6 KiB
24/01/15 05:13:58 WARN DAGScheduler: Broadcasting large task binary with size 1076.0 KiB
24/01/15 05:14:13 WARN DAGScheduler: Broadcasting large task binary with size 1086.0 KiB
24/01/15 05:14:15 WARN DAGScheduler: Broadcasting large task binary with size 1825.3 KiB
24/01/15 05:14:19 WARN DAGScheduler: Broadcasting large task binary with size 3.0 MiB
24/01/15 05:14:26 ERROR Executor: Exception in task 4.0 in stage 546.0 (TID 2136)
java.lang.OutOfMemoryError: Java heap space
24/01/15 05:14:26 ERROR Executor: Exception in task 1.0 in stage 546.0 (TID 2133)
java.lang.OutOfMemoryError: Java heap space
	at org.apache.spark.ml.tree.impl.DTStatsAggregator.<init>(DTStatsAggregator.scala:77)
	at org.apache.spark.ml.tree.impl.RandomForest$.$anonfun$findBestSplits$22(RandomForest.scala:651)
	at org.apache.spark.ml.tree.im

ConnectionRefusedError: [Errno 111] Connection refused

In [None]:
# Evaluate the best rf model
eval_model(best_rf_predictions)

Hyperparameter tuning requires to much computational power and our personal machine and the VM provided by the course are not enough to run the code. We will therefore skip this part for now.

### GBT Model Training

In [19]:
# GBTRegressor for comparison
gbt = GBTRegressor(featuresCol="features", labelCol="label", maxBins=250)
gbt_model = gbt.fit(train_data)
predictions_gbt = gbt_model.transform(test_data)

ConnectionRefusedError: [Errno 111] Connection refused

In [None]:
# Evaluate GBTRegressor predictions 
eval_model(predictions_gbt)

### Finish Session

In [None]:
# Close context
spark.stop()