In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col,year,month,regexp_extract

#initilaise spark session
spark = SparkSession.builder.appName("UltramarathonAnalysis").getOrCreate()

#loading the dataset
file_path = "/content/TWO_CENTURIES_OF_UM_RACES"
df = spark.read.csv(file_path, header =True, inferSchema=True)

In [2]:
df.printSchema()

root
 |-- Year of event: integer (nullable = true)
 |-- Event dates: string (nullable = true)
 |-- Event name: string (nullable = true)
 |-- Event distance/length: string (nullable = true)
 |-- Event number of finishers: integer (nullable = true)
 |-- Athlete performance: string (nullable = true)
 |-- Athlete club: string (nullable = true)
 |-- Athlete country: string (nullable = true)
 |-- Athlete year of birth: double (nullable = true)
 |-- Athlete gender: string (nullable = true)
 |-- Athlete age category: string (nullable = true)
 |-- Athlete average speed: string (nullable = true)
 |-- Athlete ID: integer (nullable = true)



In [3]:
df.show(5)

+-------------+-----------+-------------------+---------------------+-------------------------+-------------------+------------------+---------------+---------------------+--------------+--------------------+---------------------+----------+
|Year of event|Event dates|         Event name|Event distance/length|Event number of finishers|Athlete performance|      Athlete club|Athlete country|Athlete year of birth|Athlete gender|Athlete age category|Athlete average speed|Athlete ID|
+-------------+-----------+-------------------+---------------------+-------------------------+-------------------+------------------+---------------+---------------------+--------------+--------------------+---------------------+----------+
|         2018| 06.01.2018|Selva Costera (CHI)|                 50km|                       22|          4:51:39 h|             Tnfrc|            CHI|               1978.0|             M|                 M35|               10.286|         0|
|         2018| 06.01.2018|Selva

In [6]:
# Extract Year & Month from Event Dates
df = df.withColumn("event_year", year(col("Event dates")))
df = df.withColumn("event_month", month(col("Event dates")))


In [7]:
# Extract Country from Event Name (Assuming last word is country)
df = df.withColumn("event_country", regexp_extract(col("Event name"), "(\\w+)$", 1))


In [9]:
from pyspark.sql.functions import to_date

# Convert Event dates to DateType
df = df.withColumn("event_date", to_date(col("Event dates"), "yyyy-MM-dd"))  # Adjust format if necessary

# Extract Year & Month
df = df.withColumn("event_year", year(col("event_date")))
df = df.withColumn("event_month", month(col("event_date")))

# Show transformed dataset
df.select("event_date", "event_year", "event_month", "event_country").show(5)


+----------+----------+-----------+-------------+
|event_date|event_year|event_month|event_country|
+----------+----------+-----------+-------------+
|      NULL|      NULL|       NULL|             |
|      NULL|      NULL|       NULL|             |
|      NULL|      NULL|       NULL|             |
|      NULL|      NULL|       NULL|             |
|      NULL|      NULL|       NULL|             |
+----------+----------+-----------+-------------+
only showing top 5 rows



In [10]:
df.select("Event dates").show(10, False)


+-----------+
|Event dates|
+-----------+
|06.01.2018 |
|06.01.2018 |
|06.01.2018 |
|06.01.2018 |
|06.01.2018 |
|06.01.2018 |
|06.01.2018 |
|06.01.2018 |
|06.01.2018 |
|06.01.2018 |
+-----------+
only showing top 10 rows



In [11]:
from pyspark.sql.functions import to_date, col, year, month

# Convert "Event dates" from "DD.MM.YYYY" format to a proper date type
df = df.withColumn("event_date", to_date(col("Event dates"), "dd.MM.yyyy"))

# Extract Year & Month
df = df.withColumn("event_year", year(col("event_date")))
df = df.withColumn("event_month", month(col("event_date")))

# Show results
df.select("Event dates", "event_date", "event_year", "event_month").show(10, False)


+-----------+----------+----------+-----------+
|Event dates|event_date|event_year|event_month|
+-----------+----------+----------+-----------+
|06.01.2018 |2018-01-06|2018      |1          |
|06.01.2018 |2018-01-06|2018      |1          |
|06.01.2018 |2018-01-06|2018      |1          |
|06.01.2018 |2018-01-06|2018      |1          |
|06.01.2018 |2018-01-06|2018      |1          |
|06.01.2018 |2018-01-06|2018      |1          |
|06.01.2018 |2018-01-06|2018      |1          |
|06.01.2018 |2018-01-06|2018      |1          |
|06.01.2018 |2018-01-06|2018      |1          |
|06.01.2018 |2018-01-06|2018      |1          |
+-----------+----------+----------+-----------+
only showing top 10 rows



In [13]:
from pyspark.sql.functions import to_date, col, year, month, when, split

# Split the "Event dates" column based on the hyphen
df = df.withColumn("event_date_split", split(col("Event dates"), "-"))

# Handle cases with a single date and date ranges
df = df.withColumn(
    "event_date",
    when(
        col("event_date_split").getItem(1).isNull(),  # Check if it's a single date
        to_date(col("event_date_split").getItem(0), "dd.MM.yyyy")
    ).otherwise(
        to_date(col("event_date_split").getItem(0), "dd.MM.yyyy")  # Take the first date in a range
    )
)

# Extract Year & Month
df = df.withColumn("event_year", year(col("event_date")))
df = df.withColumn("event_month", month(col("event_date")))

# Show results
df.select("Event dates", "event_date", "event_year", "event_month").show(10, False)

+-----------+----------+----------+-----------+
|Event dates|event_date|event_year|event_month|
+-----------+----------+----------+-----------+
|06.01.2018 |2018-01-06|2018      |1          |
|06.01.2018 |2018-01-06|2018      |1          |
|06.01.2018 |2018-01-06|2018      |1          |
|06.01.2018 |2018-01-06|2018      |1          |
|06.01.2018 |2018-01-06|2018      |1          |
|06.01.2018 |2018-01-06|2018      |1          |
|06.01.2018 |2018-01-06|2018      |1          |
|06.01.2018 |2018-01-06|2018      |1          |
|06.01.2018 |2018-01-06|2018      |1          |
|06.01.2018 |2018-01-06|2018      |1          |
+-----------+----------+----------+-----------+
only showing top 10 rows



In [14]:
# How has partcipation in ultramarathons evolved over time?

from pyspark.sql.functions import count

# Count races per year
df.groupby("event_year").agg(count("*").alias("num_races")).orderBy("event_year").show(10)

+----------+---------+
|event_year|num_races|
+----------+---------+
|      NULL|   448893|
|      2016|   414913|
|      2017|   457176|
|      2018|   502394|
|      2019|    76161|
+----------+---------+



In [15]:
#What is the gender distribution in ultramarathons?

from pyspark.sql.functions import countDistinct

#Count male vs female participants
df.groupBy("Athlete gender").agg(countDistinct("Athlete ID").alias("num_athletes")).show()

+--------------+------------+
|Athlete gender|num_athletes|
+--------------+------------+
|             F|      157220|
|             M|      511180|
|             X|           3|
+--------------+------------+



In [16]:
#What is the average age of ultramarathon runners?

from pyspark.sql.functions import avg

#Calculate average age
df.withColumn("Athlete age", col("event_year") - col("Athlete year of birth"))\
.select(avg("Athlete age")).show()

+-----------------+
| avg(Athlete age)|
+-----------------+
|42.29559244359424|
+-----------------+



In [20]:
#What are the most common race distances

df.groupBy("Event distance/length").count().orderBy("count", ascending=False).show()

+---------------------+------+
|Event distance/length| count|
+---------------------+------+
|                 50km|433491|
|                100km|206052|
|                 50mi| 66118|
|                 56km| 55867|
|                 45km| 49423|
|                 60km| 47522|
|                 55km| 41887|
|                  24h| 37473|
|                100mi| 36649|
|                 70km| 31044|
|                  12h| 30706|
|                   6h| 30005|
|                 52km| 30001|
|                 48km| 26807|
|                 80km| 26471|
|                 65km| 24795|
|                 90km| 23419|
|                 54km| 20838|
|                 53km| 20755|
|                 46km| 19213|
+---------------------+------+
only showing top 20 rows



In [21]:
#Which countries have the fastest runners on average?

df.groupBy("Athlete country").agg(avg("Athlete average speed").alias("avg_speed")) \
.orderBy("avg_speed", ascending=False).show(10)

+---------------+------------------+
|Athlete country|         avg_speed|
+---------------+------------------+
|            ETH| 13.00621153846154|
|            LES| 11.45335761589404|
|            MAW|      11.068765625|
|            KEN|10.388723502304144|
|            MOZ|10.021866666666666|
|            ZIM| 9.977878612716767|
|            UGA| 9.967936170212766|
|            RWA| 9.777249999999999|
|            SWZ| 9.401596330275227|
|            LBA|             9.359|
+---------------+------------------+
only showing top 10 rows



In [22]:
from pyspark.sql.functions import avg

# Average speed over the years
df.groupBy("event_year").agg(avg("Athlete average speed").alias("avg_speed")).orderBy("event_year").show(10)


+----------+-----------------+
|event_year|        avg_speed|
+----------+-----------------+
|      NULL|5.494823447313192|
|      2016|7.358469770536641|
|      2017|7.235842806466417|
|      2018|7.133135042915782|
|      2019|7.691136713980048|
+----------+-----------------+



In [23]:
#At what age do ultramarathon runners perform best?
df = df.withColumn("Athlete age", col("event_year") - col("Athlete year of birth"))

# Group by age and get average speed
df.groupBy("Athlete age").agg(avg("Athlete average speed").alias("avg_speed")).orderBy("Athlete age").show()


+-----------+------------------+
|Athlete age|         avg_speed|
+-----------+------------------+
|       NULL| 5.772467590545499|
|        0.0|             4.799|
|        2.0|             6.081|
|        3.0|             4.833|
|        5.0|             6.466|
|        6.0|             7.665|
|        7.0|6.1240000000000006|
|        8.0|           4.83725|
|        9.0|4.9281999999999995|
|       10.0| 5.466272727272727|
|       11.0| 5.249642857142855|
|       12.0| 5.959541666666669|
|       13.0| 5.688461538461539|
|       14.0| 6.281279069767442|
|       15.0| 6.574620967741936|
|       16.0| 6.804905594405593|
|       17.0| 7.203809069212412|
|       18.0| 6.834027677496997|
|       19.0|6.9493510082150864|
|       20.0|7.3671006914433965|
+-----------+------------------+
only showing top 20 rows



PREDICTING RACE PERFORMANCE


In [24]:
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.regression import LinearRegression

# Convert categorical columns to numerical
indexer_gender = StringIndexer(inputCol="Athlete gender", outputCol="gender_index")
indexer_race = StringIndexer(inputCol="Event distance/length", outputCol="race_index")

df = indexer_gender.fit(df).transform(df)
df = indexer_race.fit(df).transform(df)

# Create a feature vector
feature_cols = ["Athlete age", "gender_index", "race_index"]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
df = assembler.transform(df).select("features", col("Athlete average speed").alias("label"))


In [30]:
df = df.withColumn("label", col("label").cast("double"))


In [31]:
df.printSchema()
df.show(5)


root
 |-- features: vector (nullable = true)
 |-- label: double (nullable = true)

+--------------+------+
|      features| label|
+--------------+------+
|[40.0,0.0,0.0]|10.286|
|[37.0,0.0,0.0]| 9.501|
|[31.0,0.0,0.0]| 9.472|
|[42.0,0.0,0.0]| 8.976|
|[26.0,0.0,0.0]| 8.469|
+--------------+------+
only showing top 5 rows



In [32]:
# Split data into 80% training and 20% testing
train_data, test_data = df.randomSplit([0.8, 0.2], seed=42)


In [36]:
df.printSchema()

root
 |-- features: vector (nullable = true)
 |-- label: double (nullable = true)



In [40]:
from pyspark.ml.functions import vector_to_array

# Convert vector column to an array
df = df.withColumn("features_array", vector_to_array("features"))

# Check feature indices (assuming the order was ["Athlete age", "gender_index", "race_index"])
df = df.withColumn("Athlete age", df["features_array"][0])
df = df.withColumn("gender_index", df["features_array"][1])
df = df.withColumn("race_index", df["features_array"][2])

# Show the extracted columns
df.select("Athlete age", "gender_index", "race_index", "label").show(5)


+-----------+------------+----------+------+
|Athlete age|gender_index|race_index| label|
+-----------+------------+----------+------+
|       40.0|         0.0|       0.0|10.286|
|       37.0|         0.0|       0.0| 9.501|
|       31.0|         0.0|       0.0| 9.472|
|       42.0|         0.0|       0.0| 8.976|
|       26.0|         0.0|       0.0| 8.469|
+-----------+------------+----------+------+
only showing top 5 rows



In [41]:
print(df.columns)


['features', 'label', 'features_array', 'Athlete age', 'gender_index', 'race_index']


In [42]:
df = df.withColumnRenamed("Athlete_Age", "Athlete age")


In [45]:
print(df.columns)


['features', 'label', 'features_array', 'Athlete age', 'gender_index', 'race_index']


In [46]:
df_raw = spark.read.csv("/content/TWO_CENTURIES_OF_UM_RACES", header=True, inferSchema=True)
df_raw.printSchema()


root
 |-- Year of event: integer (nullable = true)
 |-- Event dates: string (nullable = true)
 |-- Event name: string (nullable = true)
 |-- Event distance/length: string (nullable = true)
 |-- Event number of finishers: integer (nullable = true)
 |-- Athlete performance: string (nullable = true)
 |-- Athlete club: string (nullable = true)
 |-- Athlete country: string (nullable = true)
 |-- Athlete year of birth: double (nullable = true)
 |-- Athlete gender: string (nullable = true)
 |-- Athlete age category: string (nullable = true)
 |-- Athlete average speed: string (nullable = true)
 |-- Athlete ID: integer (nullable = true)



In [48]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col,year,month,regexp_extract, to_date, when, split, avg
from pyspark.sql.functions import count, countDistinct
from pyspark.ml.feature import StringIndexer, VectorAssembler

#initialize spark session
spark = SparkSession.builder.appName("UltramarathonAnalysis").getOrCreate()

#loading the dataset
file_path = "/content/TWO_CENTURIES_OF_UM_RACES"
df_raw = spark.read.csv(file_path, header =True, inferSchema=True)

# Convert "Event dates" to a proper date type and extract year and month
df_raw = df_raw.withColumn("event_date_split", split(col("Event dates"), "-"))
df_raw = df_raw.withColumn(
    "event_date",
    when(
        col("event_date_split").getItem(1).isNull(),  # Check if it's a single date
        to_date(col("event_date_split").getItem(0), "dd.MM.yyyy")
    ).otherwise(
        to_date(col("event_date_split").getItem(0), "dd.MM.yyyy")  # Take the first date in a range
    )
)
df_raw = df_raw.withColumn("event_year", year(col("event_date")))
df_raw = df_raw.withColumn("event_month", month(col("event_date")))


# Calculate Athlete age
df_raw = df_raw.withColumn("Athlete age", col("event_year") - col("Athlete year of birth"))

# Convert categorical columns to numerical using StringIndexer
indexer_gender = StringIndexer(inputCol="Athlete gender", outputCol="gender_index")
indexer_race = StringIndexer(inputCol="Event distance/length", outputCol="race_index")

df_raw = indexer_gender.fit(df_raw).transform(df_raw)
df_raw = indexer_race.fit(df_raw).transform(df_raw)

# Assuming "Athlete average speed" is your label column
df = df_raw.select("Athlete age", "gender_index", "race_index", col("Athlete average speed").alias("label"))
df.show(5)

+-----------+------------+----------+------+
|Athlete age|gender_index|race_index| label|
+-----------+------------+----------+------+
|       40.0|         0.0|       0.0|10.286|
|       37.0|         0.0|       0.0| 9.501|
|       31.0|         0.0|       0.0| 9.472|
|       42.0|         0.0|       0.0| 8.976|
|       26.0|         0.0|       0.0| 8.469|
+-----------+------------+----------+------+
only showing top 5 rows



In [49]:
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)
