In [1]:
from pyspark.sql.functions import col, split, concat_ws, when, size, min, trim, round, mean, lit

In [2]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator

In [3]:
import findspark
findspark.init()

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .appName("TrackTimePrediction") \
    .getOrCreate()

spark

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/06/13 18:00:37 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [None]:
spark.stop()

Machine learning time

In [4]:
file_path1 = "./final_database/merged_half_thresh.csv"
df = spark.read.csv(file_path1, header=True, inferSchema=True)
print(df.count())
df.show(1)

2116
+-------------+-----+----------+--------------------+-----+-----+------+-----+-----+------+------+-------+-------+-------+-------+-------+-------+-------+-------+------+------+------+------+-------+-----------------+
| athlete_name|  _c0|Unnamed: 0|      Athlete/School|Grade|State|Gender|FR_5k|SO_5k| JR_5k| SR_5k|FR_3200|SO_3200|JR_3200|SR_3200|FR_1600|SO_1600|JR_1600|SR_1600|FR_800|SO_800|JR_800|SR_800|     id|       PR_seconds|
+-------------+-----+----------+--------------------+-----+-----+------+-----+-----+------+------+-------+-------+-------+-------+-------+-------+-------+-------+------+------+------+------+-------+-----------------+
|Camron Gaddis|66230|      3024|Camron Gaddis Col...| 2022|   GA|  Boys| NULL| NULL|1093.4|1037.4|   NULL|  683.8|   NULL|  623.9|   NULL|  303.4|  294.6|  287.2|  NULL| 138.5|  NULL| 126.5|8265308|932.5400009155273|
+-------------+-----+----------+--------------------+-----+-----+------+-----+-----+------+------+-------+-------+-------+-----

To deal with NULLs, will try imputing with an indicator column, and simply setting the NULLs to a very large value. The first is more robust, but I worry about setting nulls to the mean since I am hoping to capture trends

In [12]:
df = df.fillna(9999999, subset=['FR_5k', 'SO_5k', 'JR_5k', 'SR_5k', 'FR_3200', 'SO_3200', 'JR_3200', 'SR_3200', 'FR_1600', 'SO_1600', 'JR_1600', 'SR_1600', 'FR_800', 'SO_800', 'JR_800', 'SR_800'])
df.show(5)

+-------------+-----+----------+--------------------+-----+-----+------+---------+---------+-----------------+-----------------+---------+-------+---------+-------+---------+---------+---------+-------+---------+---------+---------+---------+-------+------------------+
| athlete_name|  _c0|Unnamed: 0|      Athlete/School|Grade|State|Gender|    FR_5k|    SO_5k|            JR_5k|            SR_5k|  FR_3200|SO_3200|  JR_3200|SR_3200|  FR_1600|  SO_1600|  JR_1600|SR_1600|   FR_800|   SO_800|   JR_800|   SR_800|     id|        PR_seconds|
+-------------+-----+----------+--------------------+-----+-----+------+---------+---------+-----------------+-----------------+---------+-------+---------+-------+---------+---------+---------+-------+---------+---------+---------+---------+-------+------------------+
|Camron Gaddis|66230|      3024|Camron Gaddis Col...| 2022|   GA|  Boys|9999999.0|9999999.0|           1093.4|           1037.4|9999999.0|  683.8|9999999.0|  623.9|9999999.0|    303.4|    29

In [13]:
feature_cols = ['FR_5k', 'SO_5k', 'JR_5k', 'SR_5k', 'FR_3200', 'SO_3200', 'JR_3200', 'SR_3200', 'FR_1600', 'SO_1600', 'JR_1600', 'SR_1600', 'FR_800', 'SO_800', 'JR_800', 'SR_800']
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features", handleInvalid='keep')
model_df = assembler.transform(df)
model_df.select('features', 'PR_seconds').show(5, truncate=False)

+-----------------------------------------------------------------------------------------------------------------------------------------------------------+------------------+
|features                                                                                                                                                   |PR_seconds        |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------+------------------+
|[9999999.0,9999999.0,1093.4,1037.4,9999999.0,683.8,9999999.0,623.9,9999999.0,303.4,294.6,287.2,9999999.0,138.5,9999999.0,126.5]                            |932.5400009155273 |
|[984.3,995.1,978.2,941.3,585.8,594.9,557.2,573.5,269.6,268.8,257.3,268.7,127.1,9999999.0,118.4,9999999.0]                                                  |872.3899993896484 |
|[9999999.0,1012.77,956.8666666666668,933.9266666666666,638.2,589.3,575.9,567.9,9999999.0,9999999.0,9999999.0,262.2

In [29]:
train_data, test_data = model_df.randomSplit(weights=[0.8, 0.2], seed=27)
print(f"Training Dataset Count: {train_data.count()}")
print(f"Test Dataset Count: {test_data.count()}")

Training Dataset Count: 1683
Test Dataset Count: 433


In [30]:
gbt = GBTRegressor(featuresCol='features', labelCol='PR_seconds')
print("Training the GBT model...")
gbt_model = gbt.fit(train_data)
print("Training complete.")

Training the GBT model...
Training complete.


In [31]:
predictions = gbt_model.transform(test_data)
predictions.select("PR_seconds", "prediction").show(10)

+-----------------+-----------------+
|       PR_seconds|       prediction|
+-----------------+-----------------+
|941.0400009155273|972.0809625750252|
|959.6399993896484|971.3641698728927|
|923.2700004577637|901.4157980013447|
|830.5099983215332|839.7774970076775|
|859.1499996185303|877.3700074587806|
|861.7199993133545|907.6944414253484|
|898.8899993896484|842.3040684579439|
|853.1199998855591|868.3939686703186|
|957.2299995422363| 878.363313652297|
|883.3499984741211|866.2991131107171|
+-----------------+-----------------+
only showing top 10 rows


In [32]:
evaluator = RegressionEvaluator(
    labelCol="PR_seconds",
    predictionCol="prediction",
    metricName="rmse"  
)
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE) on test data = {rmse:.2f} seconds")
r2_evaluator = evaluator.setMetricName("r2")
r2 = r2_evaluator.evaluate(predictions)
print(f"R-squared (R2) on test data = {r2:.2f}")


Root Mean Squared Error (RMSE) on test data = 62.94 seconds
R-squared (R2) on test data = 0.05


25/06/13 13:10:53 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 998112 ms exceeds timeout 120000 ms
25/06/13 13:10:53 WARN SparkContext: Killing executors is not supported by current scheduler.
25/06/13 13:10:53 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:53)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:342)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:132)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$

This model sucks. It is on average 62.94 seconds off. Will try to impute with indicators next. Then add more features like progression, but realistically, I just don't have enough data yet to make a broad enough model to account for all the factors at play. Will also try using the dataset with a lower threshold.

In [20]:
mean_val = df.select(mean(col("SR_800"))).first()[0]
df = df.withColumn(
    "SR_800_was_missing",
    when(col("SR_800").isNull(), 1).otherwise(0)
)
df = df.fillna(mean_val, subset=["SR_800"])
df.show(1)

+-------------+-----+----------+--------------------+-----+-----+------+------------------+----------------+------+------+-----------------+-------+-----------------+-------+-----------------+-------+-------+-------+-----------------+------+------------------+------+-------+-----------------+-----------------+-----------------+-----------------+-----------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+------------------+------------------+------------------+------------------+
| athlete_name|  _c0|Unnamed: 0|      Athlete/School|Grade|State|Gender|             FR_5k|           SO_5k| JR_5k| SR_5k|          FR_3200|SO_3200|          JR_3200|SR_3200|          FR_1600|SO_1600|JR_1600|SR_1600|           FR_800|SO_800|            JR_800|SR_800|     id|       PR_seconds|FR_5k_was_missing|SO_5k_was_missing|JR_5k_was_missing|SR_5k_was_missing|FR_3200_was_missing|SO_3200_was_m

In [21]:
output_path = './final_database'
df.coalesce(1).write \
    .option('header', 'true') \
    .mode('append') \
    .csv(output_path) 

In [22]:
feature_cols = ['FR_5k', 'SO_5k', 'JR_5k', 'SR_5k', 'FR_3200', 'SO_3200', 'JR_3200', 'SR_3200', 'FR_1600', 'SO_1600', 'JR_1600', 'SR_1600', 'FR_800', 'SO_800', 'JR_800', 'SR_800', 'FR_5k_was_missing', 'SO_5k_was_missing', 'JR_5k_was_missing', 'SR_5k_was_missing', 'FR_3200_was_missing', 'SO_3200_was_missing', 'JR_3200_was_missing', 'SR_3200_was_missing', 'FR_1600_was_missing', 'SO_1600_was_missing', 'JR_1600_was_missing', 'SR_1600_was_missing', 'FR_800_was_missing', 'SO_800_was_missing', 'JR_800_was_missing', 'SR_800_was_missing']
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features", handleInvalid='keep')
model_df = assembler.transform(df)
model_df.select('features', 'PR_seconds').show(5, truncate=False)

+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------+
|features                                                                                                                                                                                                                                                                                        |PR_seconds        |
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------+
|[1020.0831104356615,998.493021612634,1093.4,1037.4,624.0475238095241,

In [23]:
train_data, test_data = model_df.randomSplit(weights=[0.8, 0.2], seed=27)
print(f"Training Dataset Count: {train_data.count()}")
print(f"Test Dataset Count: {test_data.count()}")

Training Dataset Count: 1683
Test Dataset Count: 433


In [24]:
gbt = GBTRegressor(featuresCol='features', labelCol='PR_seconds')
print("Training the GBT model...")
gbt_model = gbt.fit(train_data)
print("Training complete.")

Training the GBT model...
Training complete.


In [25]:
predictions = gbt_model.transform(test_data)
predictions.select("PR_seconds", "prediction").show(10)

+-----------------+-----------------+
|       PR_seconds|       prediction|
+-----------------+-----------------+
|941.0400009155273|986.9181956179062|
|959.6399993896484| 946.683472043823|
|923.2700004577637|886.2761575828721|
|830.5099983215332|886.7246987618596|
|859.1499996185303|850.1800357969346|
|861.7199993133545|913.8666745485638|
|898.8899993896484|907.8070305149795|
|853.1199998855591|914.4643472743517|
|957.2299995422363|913.7578695034982|
|883.3499984741211|867.5338160009733|
+-----------------+-----------------+
only showing top 10 rows


25/06/13 18:12:35 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS


In [26]:
evaluator = RegressionEvaluator(
    labelCol="PR_seconds",
    predictionCol="prediction",
    metricName="rmse"  
)
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE) on test data = {rmse:.2f} seconds")
r2_evaluator = evaluator.setMetricName("r2")
r2 = r2_evaluator.evaluate(predictions)
print(f"R-squared (R2) on test data = {r2:.2f}")

Root Mean Squared Error (RMSE) on test data = 74.15 seconds
R-squared (R2) on test data = -0.32


Imputing with the mean and marking which rows were missing was worse: 74.15 seconds off on average. Will test the other data set next and add new features soon.