# NBA Match Prediction
## By Shy Ohev Zion and Aviram Shabtai

**Disclaimer** - This notebook is based on [Berkay YALCIN](https://www.kaggle.com/yalcinberkay)'s [work](https://www.kaggle.com/code/yalcinberkay/nba-match-prediction-result-points/notebook)
- While the base notebook uses Pandas and SKLearn for data manipulation and machine learning respectively, <br>
    our code uses PySpark for both, with the addition of Kafka for data streaming.
- The differences in libraries and methodologies also encouraged us to create better functions and tools <br>
    (e.g. a model pipeline to apply regression to all 8 regression columns in one go) 

### TL;DR
In this notebook, a dataset of NBA games from 2012-2019 is used to create a 'model set' to predict a game's results - each team's score in every quarter. the set is made up of 8 different models, for each quarter for the two teams

The results are not up to the standards we set to ourselves and can be improved, but do demonstrate the use of the technologies learned during the course.

## 1. Initial Setup

### Imports:

In [1]:
# base python
from time import sleep
from io import StringIO
from pathlib import Path
from threading import Thread

# Spark imports
from pyspark.sql import functions as F
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.regression import GBTRegressor, GBTRegressionModel
from pyspark.ml.evaluation import RegressionEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.feature import (
    StringIndexer,
    StandardScaler,
    OneHotEncoder,
    VectorAssembler,
)

# Kafka imports
from confluent_kafka import Producer, Consumer

### Paths and Directories:

In [2]:
input_path = Path("input")
matches_path = input_path / "matches_w_player_stats.csv"
processed_input_path = input_path / "preprocessed_matches"

preprocessing_pipeline_path = Path("preprocessing_pipeline")
model_path = Path("models")
model_pipeline_path = model_path / "pipeline"

### Data Column Organization:

The given [dataset](https://www.kaggle.com/datasets/yalcinberkay/nba-matches-dataset-w-player-stats) was taken from https://www.nba.com/stats by the original notebook's author (the meaning of each column can be read [here](https://www.nba.com/stats/help/glossary).<br>
The following code cell includes all of the columns in the dataset and sorts them to 3 types:

- ```numerical_cols```
- ```categorical_cols```- columns that need to be converted to numeric columns, which is done using indexation and one-hot encoding
- ```non_informative_cols``` - as the name implies, these columns provide no extra information and are useless in the match result prediction (e.g. game result is derived from the final scores, which in turn are the sum of the scores in each quarter)



In [3]:
# List of player-related columns
player_cols = [
    "PLAYER_NAME",
    "MIN",
    "FGM",
    "FGA",
    "FG_PCT",
    "FG3M",
    "FG3A",
    "FG3_PCT",
    "FTM",
    "FTA",
    "FT_PCT",
    "OREB",
    "DREB",
    "REB",
    "AST",
    "STL",
    "BLK",
    "TO",
    "PF",
    "PTS",
    "PLUS_MINUS",
]

team_player_cols = [f"TEAM_{col}_{i}" for i in range(1, 11 + 1) for col in player_cols]

oppt_player_cols = [f"OPPT_{col}_{i}" for i in range(1, 11 + 1) for col in player_cols]

# List of team-related columns
team_cols = [
    "gameId",
    "teamAbbr",
    "opptAbbr",
    "result",
    "teamMin",
    "teamPTS",
    "teamPTS1",
    "teamPTS2",
    "teamPTS3",
    "teamPTS4",
    "opptPTS",
    "opptPTS1",
    "opptPTS2",
    "opptPTS3",
    "opptPTS4",
    "teamFGM",
    "teamFGA",
    "teamFG",
    "team3PM",
    "team3PA",
    "team3PCT",
    "teamFTM",
    "teamFTA",
    "teamFTC",
    "teamORB",
    "teamDRB",
    "teamREB",
    "teamAST",
    "teamSTL",
    "teamBLK",
    "teamTO",
    "teamPF",
    "team2P",
    "teamTS",
    "teamEFG",
    "teamPPS",
    "teamFIC",
    "teamFIC40",
    "teamOrtg",
    "teamDrtg",
    "teamPlay",
]
# List of opponent team-related columns
oppt_cols = [
    "opptMin",
    "opptFGM",
    "opptFGA",
    "opptFG",
    "oppt3PM",
    "oppt3PA",
    "oppt3PCT",
    "opptFTM",
    "opptFTA",
    "opptFTC",
    "opptORB",
    "opptDRB",
    "opptREB",
    "opptAST",
    "opptSTL",
    "opptBLK",
    "opptTO",
    "opptPF",
    "oppt2P",
    "opptTS",
    "opptEFG",
    "opptPPS",
    "opptFIC",
    "opptFIC40",
    "opptOrtg",
    "opptDrtg",
    "opptPlay",
]

last_cols = ["poss", "LM_totalPoint", "LM_dayOffset", "pace"]

matchup_cols = team_cols + team_player_cols + oppt_cols + oppt_player_cols + last_cols

# List of columns that are not informative for the modeling process
non_informative_cols = [
    "gameId",
    "result",
    "opptPTS",
    "teamPTS",
] + [col for col in matchup_cols if "PLAYER_NAME" in col]

categorical_cols = ["teamAbbr", "opptAbbr"]

# List of columns to be used in regression analysis
regression_cols = [
    "teamPTS1",
    "teamPTS2",
    "teamPTS3",
    "teamPTS4",
    "opptPTS1",
    "opptPTS2",
    "opptPTS3",
    "opptPTS4",
]

numerical_cols = list(
    set(matchup_cols).difference(
        non_informative_cols + categorical_cols + regression_cols
    )
)

## 2. Data and Model Preparation

### Data Cleaning
As the matches are from 2012-2019, minor changes are required:
1. Update the abbreviations of teams that have changed their names.
1. Remove rows containing teams that are no longer active.

In [4]:
def update_abbrs_and_remove_closed_teams(df):
    updated_teams = {"NJN": "BKN", "NOH": "NOP"}

    for old, new in updated_teams.items():
        df = df.withColumn("teamAbbr", F.regexp_replace("teamAbbr", old, new))
        df = df.withColumn("opptAbbr", F.regexp_replace("opptAbbr", old, new))
    closed_teams = ["EST", "FLA", "GNS", "GUA", "MAC"]

    for team in closed_teams:
        df = df.filter((F.col("opptAbbr") != team) & (F.col("teamAbbr") != team))

    return df

### Preprocessing Pipeline Creation:

In [5]:
numerical_assembler = VectorAssembler(
    inputCols=numerical_cols, outputCol="numericalFeatures", handleInvalid="skip"
)

scaler = StandardScaler(
    inputCol=numerical_assembler.getOutputCol(), outputCol="scaledFeatures"
)

indexers = [
    StringIndexer(inputCol=c, outputCol=f"{c}_indexed") for c in categorical_cols
]

encoders = [
    OneHotEncoder(
        inputCol=idxr.getOutputCol(), outputCol=f"{idxr.getOutputCol()}_encoded"
    )
    for idxr in indexers
]

categorical_assembler = VectorAssembler(
    inputCols=[enc.getOutputCol() for enc in encoders], outputCol="categoricalFeatures"
)

final_assembler = VectorAssembler(
    inputCols=[scaler.getOutputCol(), categorical_assembler.getOutputCol()],
    outputCol="features",
)

preprocessing_pipeline = Pipeline(
    stages=[numerical_assembler]
    + [scaler]
    + indexers
    + encoders
    + [categorical_assembler]
    + [final_assembler]
)

### Data Fuctions:
```create_training_data``` - reads the raw match data, fits the pipeline to the data, preprocesses it using the fitted pipeline model, saves the preprocessed data and the model, and finally returns both.

```load_training_data``` - reads the preprocessed match data from a Parquet file (not a CSV, as one of the columns is a ```Vector```) and loads the preprocessing pipeline model. 

In [6]:
def create_training_data():
    _matches = spark.read.csv(str(matches_path), inferSchema=True, header=None).toDF(
        *matchup_cols
    )
    _matches = update_abbrs_and_remove_closed_teams(_matches)

    _preprocessing_pipeline_model = preprocessing_pipeline.fit(_matches)

    _preprocessing_pipeline_model.write().overwrite().save(
        str(preprocessing_pipeline_path)
    )

    _matches = _preprocessing_pipeline_model.transform(_matches).select(
        non_informative_cols + categorical_cols + regression_cols + ["features"]
    )

    _matches.write.save(str(processed_input_path), mode="overwrite")

    return _matches, _preprocessing_pipeline_model


def load_training_data():
    _matches = spark.read.parquet(str(processed_input_path), inferSchema=True)
    _preprocessing_pipeline_model = PipelineModel.load(str(preprocessing_pipeline_path))

    return _matches, _preprocessing_pipeline_model

### Model Functions
```create_models``` - creates and trains 8 different regressors for each of the regression columns (as mensioned above, one for each team score in every quarter), and creates a pipeline for streamlining the prediction process - one transformation instead of applying 8 different ones in a row. 

```load_models``` - loads the created regressors and pipeline.

In [7]:
def create_models(train_df):
    _models = dict()

    for reg_col in regression_cols:
        regressor = GBTRegressor(
            labelCol=reg_col,
            featuresCol="features",
            predictionCol=f"{reg_col}_prediction",
        )

        paramGrid = (
            ParamGridBuilder()
            .addGrid(regressor.maxDepth, [2, 4, 6])
            .addGrid(regressor.maxBins, [20, 30])
            .addGrid(regressor.maxIter, [10, 20])
            .build()
        )

        cv = CrossValidator(
            estimator=regressor,
            estimatorParamMaps=paramGrid,
            evaluator=RegressionEvaluator(
                labelCol=regressor.getLabelCol(),
                predictionCol=regressor.getPredictionCol(),
            ),
            numFolds=5,
        )

        cvModel = cv.fit(train_df)

        cvModel.bestModel.write().overwrite().save(str(model_path / reg_col))

    _regression_pipeline_model = PipelineModel(
        stages=[_models[reg_col] for reg_col in regression_cols]
    )
    _regression_pipeline_model.write().overwrite().save(str(model_pipeline_path))

    return _models, _regression_pipeline_model


def load_models():
    return {
        reg_col: GBTRegressionModel.load(str(model_path / reg_col))
        for reg_col in regression_cols
    }, PipelineModel.load(str(model_pipeline_path))

### Loading or Creating the Training Data

In [8]:
try:
    matches, preprocessing_pipeline_model = load_training_data()
    print("data loaded")

except:
    matches, preprocessing_pipeline_model = create_training_data()
    print("data created and saved")

[Stage 1:>                                                          (0 + 1) / 1]                                                                                

data loaded


### Splitting the Data and Saving the Test Set
The test set is saved to be used in the Kafka demonstration.

In [9]:
matches_train, matches_test = matches.randomSplit([0.9, 0.1], 42)

matches_test.write.save(str(input_path / "test_input"), mode="overwrite")

23/08/13 23:33:30 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


                                                                                

### Loading or creating the Models

In [10]:
try:
    models, regression_pipeline_model = load_models()
    print("models and pipeline loaded")

except:
    models, regression_pipeline_model = create_models(matches_train)
    print("models and pipeline created and saved")



models and pipeline loaded


## 3. Application of the Models Created

### Prediction function
this function applies all of the regression models and creates 3 new columns for the overall match results
- ```teamPTS_prediction``` - the sum of the points in all quarters predicted of the home team
- ```opptPTS_prediction``` - the sum of the points in all quarters predicted of the away team
- ```winner_prediction``` - simple function of ```>=```

In [11]:
def predict_and_infer_results(df):
    _predictions = regression_pipeline_model.transform(df)

    # Extract columns related to predictions for both teams and opponents.
    prediction_cols = [f"{reg_col}_prediction" for reg_col in regression_cols]

    # Filter out columns specific to each team's and opponent's predictions.
    team_pred_cols = [col for col in prediction_cols if col.startswith("team")]
    oppt_pred_cols = [col for col in prediction_cols if col.startswith("oppt")]

    for pred_col in prediction_cols:
        _predictions = _predictions.withColumn(pred_col, F.col(pred_col).cast("int"))

    # Compute the total predicted scores for both teams and opponents.
    # Produces the classified jobs batch to topics
    _predictions = _predictions.withColumn(
        "teamPTS_prediction", sum(_predictions[col] for col in team_pred_cols)
    )
    _predictions = _predictions.withColumn(
        "opptPTS_prediction", sum(_predictions[col] for col in oppt_pred_cols)
    )
    _predictions = _predictions.withColumn(
        "winner_prediction",
        F.when(
            _predictions["teamPTS_prediction"] >= _predictions["opptPTS_prediction"],
            _predictions["teamAbbr"],
        ).otherwise(_predictions["opptAbbr"]),
    )

    return _predictions

### Training Results 

In [12]:
predictions = predict_and_infer_results(matches_test)

resultStr = StringIO("")

for reg_col in regression_cols + ["teamPTS", "opptPTS"]:
    evaluator = RegressionEvaluator(
        labelCol=reg_col, predictionCol=f"{reg_col}_prediction"
    )

    predictions = predictions.withColumn(
        f"{reg_col}_prediction", F.col(f"{reg_col}_prediction").cast("double")
    )

    predictions = predictions.withColumn(
        f"{reg_col}_prediction", F.col(f"{reg_col}_prediction").cast("double")
    )

    print(f"{reg_col}:", file=resultStr)
    for metric_name in ["rmse", "mae", "var"]:
        evaluator.setMetricName(metric_name)
        print(f"\t{metric_name} - {evaluator.evaluate(predictions)}", file=resultStr)

predictions = predictions.withColumn(
    "winner",
    F.when(predictions["result"] == "Win", predictions["teamAbbr"]).otherwise(
        predictions["opptAbbr"]
    ),
)

evaluator = MulticlassClassificationEvaluator(
    labelCol="winner_indexed",
    predictionCol="winner_prediction_indexed",
    metricName="accuracy",
)

for c in ["winner", "winner_prediction"]:
    predictions = (
        StringIndexer(inputCol=c, outputCol=f"{c}_indexed")
        .fit(predictions)
        .transform(predictions)
    )

print(f"\naccuracy (final result) - {evaluator.evaluate(predictions)}", file=resultStr)

print(resultStr.getvalue())

23/08/13 23:33:43 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
23/08/13 23:33:43 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS
teamPTS1:
	rmse - 4.65109159888563
	mae - 3.6658163265306123
	var - 10.126509787588457
teamPTS2:
	rmse - 4.549192572448928
	mae - 3.639030612244898
	var - 9.119528646917944
teamPTS3:
	rmse - 4.886205071423015
	mae - 3.8520408163265305
	var - 10.632963804143827
teamPTS4:
	rmse - 4.528537629436736
	mae - 3.6403061224489797
	var - 10.264328339754108
opptPTS1:
	rmse - 4.727929391026332
	mae - 3.766581632653061
	var - 8.908137234485867
opptPTS2:
	rmse - 4.7949005650348395
	mae - 3.7818877551020407
	var - 9.703469908371572
opptPTS3:
	rmse - 4.582158167216377
	mae - 3.5982142857142856
	var - 10.144674419512294
opptPTS4:
	rmse - 4.47113759893066
	mae - 3.559948979591837
	var - 11.606539267492606
teamPTS:
	rmse - 5.529835588211738
	mae - 3.8826530612244

**The accuracy is worse than in the original notebook**. 

One main reason may be the cause:<br>
Unlike in the original notebook, our models ignore the results of the other quarters when predicting a certain quarter, and thus an incorrect assumption is being made - that the results are independent from each other.<br>
An additional cause may be the error accumulation when summing the different predictions of each quarter (this can be seen in the variance of ```teamPTS``` and ```opptPTS```)<br>
Sure, we could've built a classification model to just predict who wins, but that should be derived from the points per quarter!

## 4. Data Streaming Using Kafka

```produce_matches``` - assuming the ```batch_df``` is already after preprocessing, the function applies the prediction function defined above, formats the results into a single message and sends the message to a Kafka topic named "nba".

In [13]:
def produce_matches(batch_df, batch_id):
    # Transform the incoming batch using a pre-trained regression model.

    predictions = predict_and_infer_results(batch_df)

    for match in predictions.rdd.collect():
        message = (
            f'match {match["gameId"]}: {match["teamAbbr"]} vs {match["opptAbbr"]}\n'
            + f'    Q1 predicted scores:    {match["teamPTS1_prediction"]} - {match["opptPTS1_prediction"]}\n'
            + f'    Q2 predicted scores:    {match["teamPTS2_prediction"]} - {match["opptPTS2_prediction"]}\n'
            + f'    Q3 predicted scores:    {match["teamPTS3_prediction"]} - {match["opptPTS3_prediction"]}\n'
            + f'    Q4 predicted scores:    {match["teamPTS4_prediction"]} - {match["opptPTS4_prediction"]}\n'
            + f'    total predicted scores: {match["teamPTS_prediction"]} - {match["opptPTS_prediction"]}\n'
            + f'    predicted winner:       {match["winner_prediction"]}\n'
        )

        PRODUCER.produce("nba", value=message)
        PRODUCER.flush()

        sleep(0.2)

```consume_matches``` - the function continuously polls a Kafka consumer for new messages from a specified topic. When a new message is received, it decodes the message from bytes to a string and appends it to a text file named after the topic. In case of any exceptions, they're printed to the console. The consumer is always closed in the end.

In [14]:
def consume_matches(consumer, topic):
    try:
        while consumer_active:
            # Polls for new messages
            message = consumer.poll(timeout=3.0)

            if message is not None:
                # Saves the message to file
                message = message.value().decode("utf-8")
                with open(topic + ".txt", "a") as file:
                    file.write(message)

            # sleep(0.1)

    except Exception as ex:
        print(ex)

    finally:
        # Closes the consumer to commit final offsets
        consumer.close()

### Producer-Consumer Configuration
Here we set up configurations for a Kafka producer and consumer, and initialize them.<br>
The ```consume_matches``` function is set to run on a separate thread, which starts immediately, to continuously consume messages from the "nba" topic without blocking other operations.

In [17]:
CONFIGURATIONS = {
    "bootstrap.servers": "localhost:9092",  # Kafka's server address.
    "group.id": "NBA",  # Consumer group ID for the Kafka consumer.
    "auto.offset.reset": "smallest",  # Determines where the consumer starts reading messages.
}
PRODUCER = Producer(
    CONFIGURATIONS
)  # Initializes a Kafka producer with the specified configurations.
CONSUMER = Consumer(CONFIGURATIONS)  # Similarly, initializes a Kafka consumer.

consumer_active = (
    True  # Flag to control the while loop in the consume_matches function.
)


CONSUMER.subscribe(["nba"])  # Subscribes the consumer to the 'nba' topic.

consumer_thread = Thread(target=consume_matches, args=(CONSUMER, "nba"))
consumer_thread.start()

%4|1691959936.437|CONFWARN|rdkafka#producer-1| [thrd:app]: Configuration property group.id is a consumer property and will be ignored by this producer instance
%4|1691959936.437|CONFWARN|rdkafka#producer-1| [thrd:app]: Configuration property auto.offset.reset is a consumer property and will be ignored by this producer instance


### Streaming the Test Data as a Demonstration
The code attempts to read and stream the test input that was saved before, and for each batch of the stream calls the ```produce_matches``` function. If there's any issue (like a streaming error), it prints the problem and safely stops the background consumer thread, ```consume_matches```, which was started earlier.

In [None]:
try:
    test_df = spark.readStream.schema(matches_test.schema).parquet(
        str(input_path / "test_input")
    )

    test_stream = test_df.writeStream.foreachBatch(produce_matches).start()

    test_stream.awaitTermination()

except Exception as ex:
    print(ex)
    # Stop the consume_matches thread by setting the controlling flag to False.
    consumer_active = False
    # Wait for the consumer_thread to finish.
    consumer_thread.join()

23/08/13 23:52:20 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-23e6a27d-7e41-475c-a2d9-a6cf0d538add. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/08/13 23:52:20 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
