# NCAA Stats Data Pipeline

This "pipeline" is a notebook used to setup NCAA data in our Databricks sandbox. It's largely used as a workaround since we don't have access to DLT/jobs in our sandbox environment; For now, I'll just run the scripts manually like a peasant, but in real-life this could be converted to
DLT pipelines, jobs, etc

The steps in this notebook:
1. Setup the initial schema for landing NCAA data
1. Load raw data into Databricks
1. Run ETL scripts to cleanup and transform data into a format suitable for analysis

## Setup
Run cells in this section to get your environment setup

In [None]:
# Setup module autoreload
%load_ext autoreload
%autoreload 2

In [None]:
# Load environment variables using dotenv

from dotenv import load_dotenv

load_dotenv()

In [None]:
# Create a Spark session for the Databricks compute environment
from pyspark.sql import SparkSession
from ncaa_tournament_predictor.config import Config
from ncaa_tournament_predictor.databricks import get_databricks_spark_session

# Explicit typing as SparkSession here to help out intellisense...DatabricksSession intellisense
# isn't very good. In all my exploration so far, the DatabricksSession is compatible with the SparkSession
spark: SparkSession = get_databricks_spark_session(Config.databricks_profile())

In [None]:
# Run all cells above this one to setup your environment

## Schema Setup

Initial steps to create a Databricks schema for holding NCAA mens basketball data

In [None]:
# Create the ncaa_mens_basketball schema
spark.sql("create schema if not exists object_computing.ncaa_mens_basketball;")

## Raw Data Volumes
Setup volumes for holding raw data files from various external data sources (CSVs, text files, etc)

In [None]:
# Create a volume for raw Kaggle stats data

from ncaa_tournament_predictor import volumes

raw_kaggle_stats_sql_object = volumes.as_sql_object(volumes.raw_kaggle_stats)
spark.sql(f"create volume if not exists {raw_kaggle_stats_sql_object}")

In [None]:
# Copy raw data into the raw_kaggle_stats volume

import os

from ncaa_tournament_predictor import volumes

notebook_dir = os.path.abspath(os.getcwd())
kaggle_dataset_path = os.path.abspath(
    os.path.join(notebook_dir, "../datasets/kaggle_ncaa_stats")
)

for filename in os.listdir(kaggle_dataset_path):
    spark.copyFromLocalToFs(
        local_path=os.path.join(kaggle_dataset_path, filename),
        dest_path=os.path.join(volumes.without_dbfs_protocol(volumes.raw_kaggle_stats), filename)
    )

In [None]:
# Read the Kaggle stats dataset
from ncaa_tournament_predictor import transformation, volumes

raw_kaggle_stats = (
    spark.read.format("csv")
        .options(header=True, inferSchema=True, mergeSchema=True)
        .load(volumes.raw_kaggle_stats)
)
cleaned_ncaa_data = transformation.get_cleaned_kaggle_stats(raw_kaggle_stats)

In [None]:
# Create a volume for raw head-to-head data

from ncaa_tournament_predictor import volumes

spark.sql(f"create volume if not exists {volumes.as_sql_object(volumes.raw_head_to_head)}")

In [None]:
# Copy raw data into the raw_head_to_head volume

import os

from ncaa_tournament_predictor import volumes

notebook_dir = os.path.abspath(os.getcwd())
head_to_head_dataset_path = os.path.abspath(
    os.path.join(notebook_dir, "../datasets/kenpom_head_to_head")
)

for filename in os.listdir(head_to_head_dataset_path):
    spark.copyFromLocalToFs(
        local_path=os.path.join(head_to_head_dataset_path, filename),
        dest_path=os.path.join(volumes.without_dbfs_protocol(volumes.raw_head_to_head), filename)
    )

## Data Cleanup & Transformation
Process the raw data, clean it up, and transform it for analysis

In [None]:
# Create the cleaned Kaggle datasets table
from ncaa_tournament_predictor.jobs import kaggle_stats

kaggle_stats.run_job()

In [None]:
# Create the cleaned head-to-head table

from ncaa_tournament_predictor.jobs import head_to_head

head_to_head.run_job()

## Creating a Game Prediction Model
Combine data sets to create a dataset used for training an ML model. Then train and test the resulting model

In [None]:
# An equation for finding the optimal embedding size from a count of distinct items in the dataset
import math

def _get_embedding_output_size(distinct_data_size: int) -> int:
    raw_output_size = 4 * math.sqrt(distinct_data_size)
    return 2 ** round(math.log2(raw_output_size))

print(f"Embedding test of 379 items: {_get_embedding_output_size(379)}")
print(f"Embedding test of 35 items: {_get_embedding_output_size(35)}")

In [None]:
# Join the team stats and head-to-head results to build a training dataset
from pyspark.sql.functions import rand

from ncaa_tournament_predictor import transformation, tables


team_stats = spark.read.table(tables.cleaned_kaggle_stats)
head_to_head_results = spark.read.table(tables.cleaned_head_to_head_results)

train_test_dataset = transformation.get_training_dataset(team_stats, head_to_head_results)
training_dataset_sample = train_test_dataset.orderBy(rand()).limit(500)
row_count = train_test_dataset.count()
conference_count = team_stats.select("conference").distinct().count()
team_count = team_stats.select("team").distinct().count()
print(f"Rows: {row_count}, distinct conferences: {conference_count}, distinct teams: {team_count}")

In [None]:
train_test_dataset.columns

In [None]:
# Get data formatted for Tensorflow models
from ncaa_tournament_predictor.tensorflow_models import game_prediction

numeric_feature_columns = game_prediction.columns.individual_team_numeric_feature_columns
# Get pre-processing layers derived from known data
preprocessing_layers = game_prediction.get_data_preprocessing_layers(train_test_dataset, team_stats)
features_preprocessor = game_prediction.get_features_preprocessor(
    numeric_feature_columns=numeric_feature_columns,
    team_vectorizer=preprocessing_layers.team_vectorizer,
    conference_vectorizer=preprocessing_layers.conference_vectorizer,
    stats_normalizer=preprocessing_layers.stats_normalizer,
)
training_data_preprocessor = game_prediction.get_training_data_preprocessor(
    features_preprocessor=features_preprocessor,
)

# Split training and test data using arbitrary, but consistent seed for train/test split
train_test_split_seed = 105
raw_training_dataset, raw_test_dataset = train_test_dataset.randomSplit([0.8, 0.2], seed=train_test_split_seed)

training_dataset = game_prediction.get_preprocessed_game_prediction_training_dataset(
    raw_training_dataset,
    numeric_feature_columns=numeric_feature_columns,
    preprocessor=training_data_preprocessor,
)
test_dataset = game_prediction.get_preprocessed_game_prediction_training_dataset(
    raw_test_dataset,
    numeric_feature_columns=numeric_feature_columns,
    preprocessor=training_data_preprocessor,
)

In [None]:
# Analyze the Tensors used for model input
import pandas as pd

np_array = []
for tensor_idx, tensor in enumerate(training_dataset):
    if tensor_idx > 25:
        break
    np_array.append(tensor)
tensor_sample = pd.DataFrame.from_dict(np_array)

In [None]:
# Build and train the model
import tensorflow as tf
tf.config.run_functions_eagerly(True)

from ncaa_tournament_predictor.tensorflow_models import game_prediction

# Compile model
model = game_prediction.create_model(preprocessing_layers.team_vectorizer, preprocessing_layers.conference_vectorizer)
model.compile(optimizer="adam", loss="binary_crossentropy", metrics=["accuracy"])

# Train model
model.fit(training_dataset, epochs=10, validation_data=test_dataset)


In [None]:
loss, accuracy = model.evaluate(test_dataset)
print(f"Test Accuracy: {accuracy:.4f}")

In [None]:
model.save("../dist/neural_network_3_layer_model.keras")

## Feature Differences Model
Rather than look at raw feature values, calculate the difference between features for two teams (e.g. `t1_offensive_efficiency` - `t2_offensive_efficiency`) to
see if that results in better predictions

In [None]:
# Get compared-features dataset

from ncaa_tournament_predictor import queries, tables

team_stats = spark.read.table(tables.cleaned_kaggle_stats)
stats_differences_train_test_dataset = queries.get_stats_differences_training_dataset(spark)
stats_differences_train_test_dataset_sample = stats_differences_train_test_dataset.sample(fraction=0.1)

In [None]:
# Get data formatted for Tensorflow models
from ncaa_tournament_predictor.tensorflow_models import game_prediction


numeric_feature_columns = game_prediction.columns.stat_comparison_numeric_feature_columns
# Get pre-processing layers derived from known data
preprocessing_layers = game_prediction.get_data_preprocessing_layers(
    stats_differences_train_test_dataset,
    game_prediction.columns.stat_comparison_numeric_feature_columns,
team_stats)
features_preprocessor = game_prediction.get_features_preprocessor(
    numeric_feature_columns=numeric_feature_columns,
    team_vectorizer=preprocessing_layers.team_vectorizer,
    conference_vectorizer=preprocessing_layers.conference_vectorizer,
    stats_normalizer=preprocessing_layers.stats_normalizer,
)
training_data_preprocessor = game_prediction.get_training_data_preprocessor(
    features_preprocessor=features_preprocessor
)

# Split training and test data using arbitrary, but consistent seed for train/test split
train_test_split_seed = 105
raw_training_dataset, raw_test_dataset = stats_differences_train_test_dataset.randomSplit([0.8, 0.2], seed=train_test_split_seed)

training_dataset = game_prediction.get_preprocessed_game_prediction_training_dataset(
    raw_training_dataset,
    numeric_feature_columns=numeric_feature_columns,
    preprocessor=training_data_preprocessor,
)
test_dataset = game_prediction.get_preprocessed_game_prediction_training_dataset(
    raw_test_dataset,
    numeric_feature_columns=numeric_feature_columns,
    preprocessor=training_data_preprocessor,
)

In [None]:
# Build and train the model
import tensorflow as tf
tf.config.run_functions_eagerly(True)

from ncaa_tournament_predictor.tensorflow_models import game_prediction

# Compile model
model = game_prediction.create_model(
    game_prediction.columns.stat_comparison_numeric_feature_columns,
    preprocessing_layers.team_vectorizer,
    preprocessing_layers.conference_vectorizer
)
model.compile(optimizer="adam", loss="binary_crossentropy", metrics=["accuracy"])

# Train model
model.fit(training_dataset, epochs=10, validation_data=test_dataset)


In [None]:
model.save("../experimentation/models/team-comparison-nn-three-layer-dropout.keras")

In [None]:
# Load the saved model
import tensorflow as tf

team_comparison_model = tf.keras.models.load_model("../experimentation/models/team-comparison-nn-three-layer-dropout.keras")

In [None]:
# Test the model

loss, accuracy = team_comparison_model.evaluate(test_dataset)
print(f"Test Accuracy: {accuracy:.4f}")

In [None]:
from ncaa_tournament_predictor import queries

predicted_game_raw_df = queries.get_stats_differences(spark=spark, team_1="Mississippi", team_2="Iowa St.", college_season=2025)
row_count = predicted_game_raw_df.count()
if row_count != 1:
    raise ValueError(f"Expected exactly 1 row in prediction data but received {row_count} rows")
predicted_game_features = predicted_game_raw_df.first()
predicted_game_inputs = features_preprocessor(predicted_game_features)
batch_predicted_game_inputs = {key: tf.expand_dims(value, axis=0) for key, value in predicted_game_inputs.items()}

prediction = team_comparison_model.predict(batch_predicted_game_inputs)
probability_of_team_1_win = prediction[0][0]
team_1 = predicted_game_features["team_1"]
team_2 = predicted_game_features["team_2"]
winner = team_1 if probability_of_team_1_win > 0.5 else team_2
winning_probability = probability_of_team_1_win if team_1 == winner else (1.0 - probability_of_team_1_win)
print(f"{team_1} vs {team_2}: The model predicts {winner} will win with a {winning_probability} probability")