In [None]:
 !wget https://github.com/adoptium/temurin11-binaries/releases/download/jdk-11.0.24%2B8/OpenJDK11U-jdk_x64_linux_hotspot_11.0.24_8.tar.gz
!tar xzf OpenJDK11U-jdk_x64_linux_hotspot_11.0.24_8.tar.gz
!rm OpenJDK11U-jdk_x64_linux_hotspot_11.0.24_8.tar.gz
!wget https://archive.apache.org/dist/spark/spark-3.5.2/spark-3.5.2-bin-hadoop3.tgz
!tar xzf spark-3.5.2-bin-hadoop3.tgz
!rm spark-3.5.2-bin-hadoop3.tgz
!pip install findspark

In [None]:
import os
import sys
import subprocess

working_dir = subprocess.run(['pwd'], stdout = subprocess.PIPE).stdout.strip().decode("utf-8")
print(working_dir)
os.environ["JAVA_HOME"] = working_dir + "/jdk-11.0.24+8/"
os.environ["SPARK_HOME"] = working_dir + "/spark-3.5.2-bin-hadoop3/"
spark_path = os.environ['SPARK_HOME']
sys.path.append(spark_path + "/bin")
sys.path.append(spark_path + "/python")
sys.path.append(spark_path + "/python/pyspark/")
sys.path.append(spark_path + "/python/lib")
sys.path.append(spark_path + "/python/lib/pyspark.zip")
sys.path.append(spark_path + "/python/lib/py4j-0.10.9.7-src.zip")

number_cores = 2
memory_gb = 6

In [None]:
import findspark
import pyspark

findspark.init()

conf = (pyspark.SparkConf().setMaster('local[{}]'.format(number_cores)).set('spark.driver.memory', '{}g'.format(memory_gb)))

sc = pyspark.SparkContext(conf=conf)

In [None]:
# Installs pybaseball for Statcast data
!pip install pybaseball --quiet

# Imports Statcast scraper
from pybaseball import statcast
import pandas as pd

print("Downloading 2022 data...")
data_2022 = statcast(start_dt="2022-04-07", end_dt="2022-10-02")

print("Downloading 2023 data...")
data_2023 = statcast(start_dt="2023-03-30", end_dt="2023-10-01")

print("Downloading 2024 data... ")
data_2024 = statcast(start_dt="2024-03-28", end_dt="2024-09-29")

combined = pd.concat([data_2022, data_2023, data_2024])

# Saving the data to a CSV may take time (took me around 13 minutes))
combined.to_csv("statcast_2022_2024.csv", index=False)
print("Saved as statcast_2022_2024.csv")

# Preview row count (Spoiler alert: it should be 2,121,390)
print(f"Total rows: {combined.shape[0]}")

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, count

spark = SparkSession(sc)

df = spark.read.option("header", "true").option("inferSchema", "true").csv("statcast_2022_2024.csv")
df.printSchema()
print(f"Total rows: {df.count()}")
df.show(5)

In [None]:
df_cleaned = df.drop("player_name")

In [None]:
from pybaseball import playerid_reverse_lookup
import pandas as pd

# Gets unique batter IDs
batter_ids = df_cleaned.select("batter").distinct().toPandas()
batter_ids = batter_ids["batter"].dropna().astype(int).tolist()

# Reverse lookup to get names
lookup_df = playerid_reverse_lookup(batter_ids, key_type='mlbam')
lookup_df['batter'] = lookup_df['key_mlbam']
lookup_df['player_name'] = lookup_df['name_first'] + ' ' + lookup_df['name_last']

# Renames player_name
lookup_df = lookup_df.rename(columns={"player_name": "batter_name"})

lookup_df = lookup_df[['batter', 'batter_name']]

In [None]:
lookup_spark_df = spark.createDataFrame(lookup_df)
df_with_names = df.drop("player_name").join(lookup_spark_df, on="batter", how="left")

## Filter and Aggregate Hitter Statistics

In this step, I filtered for batted balls with complete tracking data: exit velocity (launch_speed), launch angle (launch_angle), and estimated hit distance (hit_distance_sc). Then I grouped by batter and calculated the average of each metric. To reduce noise from small sample sizes, I also filtered out hitters with fewer than 50 batted ball events.


In [None]:
batted_balls = df_with_names.filter(
    (col("launch_speed").isNotNull()) &
    (col("launch_angle").isNotNull()) &
    (col("events").isNotNull())
)

hitter_stats = (
    batted_balls.groupBy("batter", "batter_name")
    .agg(
        avg("launch_speed").alias("avg_exit_velocity"),
        avg("launch_angle").alias("avg_launch_angle"),
        avg("hit_distance_sc").alias("avg_hit_distance"),
        count("*").alias("num_batted_balls")
    )
    .filter(col("num_batted_balls") >= 50)
)

## Vectorization, Scaling, and K-Means Clustering

I used Spark's MLlib to build a pipeline that performs the following:
1. Combines the hitting stats into a feature vector
2. Standardizes the features using StandardScaler
3. Applies K-Means clustering with k = 4 to segment hitters into groups based on their hitting profiles

In [None]:
from pyspark.ml.feature import VectorAssembler, StandardScaler

assembler = VectorAssembler(
    inputCols=["avg_exit_velocity", "avg_launch_angle", "avg_hit_distance"],
    outputCol="features"
)

scaler = StandardScaler(
    inputCol="features",
    outputCol="scaled_features",
    withMean=True,
    withStd=True
)

In [None]:
from pyspark.ml.clustering import KMeans
from pyspark.ml import Pipeline

kmeans = KMeans(featuresCol="scaled_features", k=4, seed=42, predictionCol="cluster")

pipeline = Pipeline(stages=[assembler, scaler, kmeans])
model = pipeline.fit(hitter_stats)

clustered_data = model.transform(hitter_stats).select(
    "batter_name", "avg_exit_velocity", "avg_launch_angle",
    "avg_hit_distance", "num_batted_balls", "cluster"
)

clustered_data.show(20)

+------------------+-----------------+------------------+------------------+----------------+-------+
|       batter_name|avg_exit_velocity|  avg_launch_angle|  avg_hit_distance|num_batted_balls|cluster|
+------------------+-----------------+------------------+------------------+----------------+-------+
|   yasmani grandal|89.15042979942692|11.091690544412607|159.23638968481376|             698|      1|
|      yoán moncada|88.85538752362955|13.655954631379963|172.99432892249527|             529|      0|
|   geraldo perdomo|82.55010416666667|12.076041666666667| 147.0917622523462|             960|      3|
|    zach mckinstry|86.80791366906475|16.610071942446044| 185.5251798561151|             695|      0|
|      heliot ramos|90.97135549872125|10.398976982097187|164.33333333333334|             391|      1|
|     michael busch|89.66396866840729|15.631853785900784|171.84595300261097|             383|      0|
|      brett wisely|84.53505535055349|14.531365313653136|165.07835820895522|      