# 1. Data Preprocessing 

#### Load the raw dataset

When loading the dataset skip the first row since it contains metadata that is not valuable for the later steps. Let's saved the new dataframe to a csv file then.

In [274]:
import pandas as pd

# Dataset file path
inputPath = 'data/dataset.csv'

# Read the dataset, skip the first row which contains metadata
data = pd.read_csv(inputPath, sep=',', skiprows=1) 

# Save the new dataset
outputPath = "data/dataset-version-1.csv"
data.to_csv(outputPath, index=False)

#### Load the newly created dataset-version-1.csv to a spark session.

All the next preprocessing tasks will be executed with Spark.

In [275]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# New Spark Session
spark = SparkSession.builder \
    .appName("NBA Player Performance Prediction") \
    .getOrCreate()

# Load the dataset
inputPath = "data/dataset-version-1.csv"
data = spark.read.csv(inputPath, header=True, inferSchema=True)

# Show the dataset
data.show(5)

+-------+--------+--------+--------------------+-------------------+------+----------+-------------+-------------------------+-----------------+---------------+---------------------+----------+---------------+-------------------------+-----------------+---------------+---------------------+----------+------------+-------------------------+-----------------+---------------+---------------------+-----+-----------+--------------------+
|EVENTID|EVENTNUM| GAME_ID|     HOMEDESCRIPTION|       PCTIMESTRING|PERIOD|PLAYER1_ID| PLAYER1_NAME|PLAYER1_TEAM_ABBREVIATION|PLAYER1_TEAM_CITY|PLAYER1_TEAM_ID|PLAYER1_TEAM_NICKNAME|PLAYER2_ID|   PLAYER2_NAME|PLAYER2_TEAM_ABBREVIATION|PLAYER2_TEAM_CITY|PLAYER2_TEAM_ID|PLAYER2_TEAM_NICKNAME|PLAYER3_ID|PLAYER3_NAME|PLAYER3_TEAM_ABBREVIATION|PLAYER3_TEAM_CITY|PLAYER3_TEAM_ID|PLAYER3_TEAM_NICKNAME|SCORE|SCOREMARGIN|  VISITORDESCRIPTION|
+-------+--------+--------+--------------------+-------------------+------+----------+-------------+-------------------------+

                                                                                

#### Find the Home team and Visitor team

We need to find the Home team name and Visitor team name. This is how we are going to do that,

1. Group the result by GAME_ID column.
2. Find the EVENTNUM column for each GAME_ID. The value in the EVENTNUM column should be 1. Find that row.
3. Go through that row and find the value in PLAYER1_TEAM_NICKNAME. That is the Home team name.
4. Go through the same row and find the value in PLAYER2_TEAM_NICKNAME. That is the Visitor team name.
5. Add these new columns for each row in the dataset.
6. Perform that for every GAME_ID.

In [276]:
# Step 1: Filter rows where EVENTNUM = 1
eventnum_1_rows = data.filter(col("EVENTNUM") == 1)

# Step 2: Select the HOME_TEAM and VISITOR_TEAM for each GAME_ID
home_and_visitor_teams = eventnum_1_rows.select(
    col("GAME_ID"),
    col("PLAYER1_TEAM_NICKNAME").alias("HOME_TEAM_NICKNAME"),
    col("PLAYER2_TEAM_NICKNAME").alias("VISITOR_TEAM_NICKNAME")
)

# Step 3: Join back to the original dataset to add the new columns
data = data.join(home_and_visitor_teams, "GAME_ID", "left")

# Show the final dataset with new columns
data.show(5)

+--------+-------+--------+--------------------+-------------------+------+----------+-------------+-------------------------+-----------------+---------------+---------------------+----------+---------------+-------------------------+-----------------+---------------+---------------------+----------+------------+-------------------------+-----------------+---------------+---------------------+-----+-----------+--------------------+------------------+---------------------+
| GAME_ID|EVENTID|EVENTNUM|     HOMEDESCRIPTION|       PCTIMESTRING|PERIOD|PLAYER1_ID| PLAYER1_NAME|PLAYER1_TEAM_ABBREVIATION|PLAYER1_TEAM_CITY|PLAYER1_TEAM_ID|PLAYER1_TEAM_NICKNAME|PLAYER2_ID|   PLAYER2_NAME|PLAYER2_TEAM_ABBREVIATION|PLAYER2_TEAM_CITY|PLAYER2_TEAM_ID|PLAYER2_TEAM_NICKNAME|PLAYER3_ID|PLAYER3_NAME|PLAYER3_TEAM_ABBREVIATION|PLAYER3_TEAM_CITY|PLAYER3_TEAM_ID|PLAYER3_TEAM_NICKNAME|SCORE|SCOREMARGIN|  VISITORDESCRIPTION|HOME_TEAM_NICKNAME|VISITOR_TEAM_NICKNAME|
+--------+-------+--------+-----------------

Now we have identified the Home team and the Visitor team. Let's do some more cleaning now. After going through the dataset, we can see that there are many missing values. We will be handling those missing values in the next steps.

#### Drop rows where "SCORE" column is empty

Before executing any data cleaning steps, we need to get an idea about the dimensions about this dataset. Therefore let's create a helper function to read the dimensions of the dataset. 

In [277]:
# Helper function to get the dimensions of a given dataset
def show_dataset_dimensions(df, message):
    """
    Displays the dimensions of a given dataframe along with a message.

    Parameters:
    - df : Dataframe
    - message : Custom message
    """
    # Get dimensions
    num_rows = df.count()
    num_columns = len(df.columns)

    # Print dimensions
    print("Dataset consists of", num_rows, "rows and", num_columns, "columns ", message)

SCORE column is one of the most important columns in the dataset for the given task. Therefore we need to clean that column first. There are many missing values in this column, therefore we will remove all the rows where SCORE column is empty in the next steps. 

In [278]:
# Show the dimensions before cleaning
show_dataset_dimensions(data, "before cleaning")

# Remove rows where "SCORE" column is empty 
score_cleaned_data = data.filter(col('SCORE').isNotNull())

# Show the dimensions after cleaning
show_dataset_dimensions(score_cleaned_data, "after cleaning")

Dataset consists of 537157 rows and 29 columns  before cleaning
Dataset consists of 137590 rows and 29 columns  after cleaning


We have 537,157 rows in the initial dataset. After performing the above cleaning step, we have 137,590 rows remaining with 27 columns.

#### Process the SCORE coloumn further.

Next let's process the "SCORE" coloumn. The score is saved the format like this: 13 - 10. If the score value does not follow this format, let's remove that row first. Then we can extract these scores to 2 different colomns saying "HOME_SCORE", and "VISITOR_SCORE". After that we can remove the "SCORE" column.

In [279]:
from pyspark.sql.functions import col, regexp_extract

# Remove rows where SCORE does not follow the "number - number" format
valid_score_data = score_cleaned_data.filter(score_cleaned_data.SCORE.rlike(r"^\d+\s*-\s*\d+$"))

# Extract scores into two separate columns: HOME_SCORE and VISITOR_SCORE
score_processed_data = valid_score_data.withColumn("HOME_SCORE", regexp_extract(col("SCORE"), r"^(\d+)\s*-\s*\d+$", 1)) \
                                 .withColumn("VISITOR_SCORE", regexp_extract(col("SCORE"), r"^\d+\s*-\s*(\d+)$", 1))

# Cast the extracted scores to integer
score_processed_data = score_processed_data.withColumn("HOME_SCORE", col("HOME_SCORE").cast("integer")) \
                               .withColumn("VISITOR_SCORE", col("VISITOR_SCORE").cast("integer"))

# Drop the original SCORE column
score_processed_data = score_processed_data.drop("SCORE")

# Show the processed dataset
score_processed_data.show(5)

# Show the dimensions before cleaning
show_dataset_dimensions(score_cleaned_data, "before cleaning")

# Show the dimensions before cleaning
show_dataset_dimensions(score_processed_data, "after cleaning")

+--------+-------+--------+--------------------+-------------------+------+----------+-------------+-------------------------+-----------------+---------------+---------------------+----------+---------------+-------------------------+-----------------+---------------+---------------------+----------+------------+-------------------------+-----------------+---------------+---------------------+-----------+--------------------+------------------+---------------------+----------+-------------+
| GAME_ID|EVENTID|EVENTNUM|     HOMEDESCRIPTION|       PCTIMESTRING|PERIOD|PLAYER1_ID| PLAYER1_NAME|PLAYER1_TEAM_ABBREVIATION|PLAYER1_TEAM_CITY|PLAYER1_TEAM_ID|PLAYER1_TEAM_NICKNAME|PLAYER2_ID|   PLAYER2_NAME|PLAYER2_TEAM_ABBREVIATION|PLAYER2_TEAM_CITY|PLAYER2_TEAM_ID|PLAYER2_TEAM_NICKNAME|PLAYER3_ID|PLAYER3_NAME|PLAYER3_TEAM_ABBREVIATION|PLAYER3_TEAM_CITY|PLAYER3_TEAM_ID|PLAYER3_TEAM_NICKNAME|SCOREMARGIN|  VISITORDESCRIPTION|HOME_TEAM_NICKNAME|VISITOR_TEAM_NICKNAME|HOME_SCORE|VISITOR_SCORE|
+-----

After that step, we have 137,590 rows and 30 columns remaining to work with.

#### Identify valuable coloumns from the dataset.

The next step is to extract the important coloumns in the dataset that are relevant for the task. After going through the dataset manually, below columns seems to be the valuable among others. Therefore will filter only these columns and proceed with them in the next steps.

In [280]:
# Relevant columns selected
selected_columns = [
    'GAME_ID', 'HOMEDESCRIPTION', 'PERIOD', 'VISITORDESCRIPTION',
    'PLAYER1_ID', 'PLAYER1_NAME', 'PLAYER1_TEAM_NICKNAME','PLAYER1_TEAM_ID', 
    'PLAYER2_ID', 'PLAYER2_NAME', 'PLAYER2_TEAM_NICKNAME', 'PLAYER2_TEAM_ID', 
    'PLAYER3_ID', 'PLAYER3_NAME', 'PLAYER3_TEAM_NICKNAME', 'PLAYER3_TEAM_ID',
    'HOME_TEAM_NICKNAME', 'VISITOR_TEAM_NICKNAME', 'HOME_SCORE', 'VISITOR_SCORE',
]

# Dataset with relevant columns selected
coloumn_cleaned_data = score_processed_data.select(*selected_columns)

# Show the dimensions before cleaning
show_dataset_dimensions(coloumn_cleaned_data, "after cleaning")

# Show a sample of cleaned data
coloumn_cleaned_data.show(5)

Dataset consists of 137590 rows and 20 columns  after cleaning
+--------+--------------------+------+--------------------+----------+-------------+---------------------+---------------+----------+---------------+---------------------+---------------+----------+------------+---------------------+---------------+------------------+---------------------+----------+-------------+
| GAME_ID|     HOMEDESCRIPTION|PERIOD|  VISITORDESCRIPTION|PLAYER1_ID| PLAYER1_NAME|PLAYER1_TEAM_NICKNAME|PLAYER1_TEAM_ID|PLAYER2_ID|   PLAYER2_NAME|PLAYER2_TEAM_NICKNAME|PLAYER2_TEAM_ID|PLAYER3_ID|PLAYER3_NAME|PLAYER3_TEAM_NICKNAME|PLAYER3_TEAM_ID|HOME_TEAM_NICKNAME|VISITOR_TEAM_NICKNAME|HOME_SCORE|VISITOR_SCORE|
+--------+--------------------+------+--------------------+----------+-------------+---------------------+---------------+----------+---------------+---------------------+---------------+----------+------------+---------------------+---------------+------------------+---------------------+----------+----

After that step, we have 137,590 rows and 20 columns remaining to work with.

#### Handle missing values in the coloumns.

Let's identify the missing values in the dataset as the next step. Let's identify the columns where all values are empty, null or na. These columns will not help for our task. Therefore after identifying, we can remove them from the dataset.

In [281]:
from pyspark.sql.functions import col

def drop_all_null_columns(dataframe):
    """
    Identify columns in given dataframe where all values are null, 'NA', or empty strings
    and remove those columns from the dataframe.

    Parameters:
    - dataframe : Dataframe

    Returns:
    - tuple
        A tuple containing:
        - The cleaned dataframe.
        - List of column names that were removed

    """
    # Count the number of null, 'na' or empty values in every column
    null_counts = {
        column: dataframe.filter(
            (col(column).isNull()) | (col(column) == "NA") | (col(column) == "")
        ).count()
        for column in dataframe.columns
    }

    # Identify columns where all rows are null, 'na', or empty
    all_null_columns = [column for column, null_count in null_counts.items() if null_count == dataframe.count()]

    # Drop those columns
    cleaned_data = dataframe.drop(*all_null_columns)

    return cleaned_data, all_null_columns

In [282]:
# Cleaned data and list of columns to be dropped
coloumn_cleaned_data, dropped_columns = drop_all_null_columns(coloumn_cleaned_data)

# Print the columns that were dropped
print("Columns with all empty values:", dropped_columns)

# Show the dimensions before cleaning
show_dataset_dimensions(coloumn_cleaned_data, "after cleaning")

Columns with all empty values: ['PLAYER3_NAME', 'PLAYER3_TEAM_NICKNAME', 'PLAYER3_TEAM_ID']
Dataset consists of 137590 rows and 17 columns  after cleaning


As we can see below columns contains only null, na or empty values,

* PLAYER3_NAME
* PLAYER3_TEAM_NICKNAME
* PLAYER3_TEAM_ID

All above columns were dropped from the dataset. Before this cleaning process we had 17 columns remaining. Now we have 12 coloumns remaining in the dataset.

Since most of the columns that are relevant to "Player 3" are dropped from the dataset, we can also remove the 'PLAYER3_ID' as well since only PLAYER3_ID column alone does not provide any valuable insights.

In [283]:
# Show the dimensions before cleaning
show_dataset_dimensions(coloumn_cleaned_data, "before cleaning")

# Drop the PLAYER3_ID column
cleaned_data = coloumn_cleaned_data.drop("PLAYER3_ID")

# Show the dimensions before cleaning
show_dataset_dimensions(cleaned_data, "after cleaning")

# Show a sample of cleaned data
cleaned_data.show(5)

Dataset consists of 137590 rows and 17 columns  before cleaning
Dataset consists of 137590 rows and 16 columns  after cleaning
+--------+--------------------+------+--------------------+----------+-------------+---------------------+---------------+----------+---------------+---------------------+---------------+------------------+---------------------+----------+-------------+
| GAME_ID|     HOMEDESCRIPTION|PERIOD|  VISITORDESCRIPTION|PLAYER1_ID| PLAYER1_NAME|PLAYER1_TEAM_NICKNAME|PLAYER1_TEAM_ID|PLAYER2_ID|   PLAYER2_NAME|PLAYER2_TEAM_NICKNAME|PLAYER2_TEAM_ID|HOME_TEAM_NICKNAME|VISITOR_TEAM_NICKNAME|HOME_SCORE|VISITOR_SCORE|
+--------+--------------------+------+--------------------+----------+-------------+---------------------+---------------+----------+---------------+---------------------+---------------+------------------+---------------------+----------+-------------+
|20001116|                NULL|     1|Iverson 21' Jump ...|       947|Allen Iverson|                76ers|  1.6

Let's find out who won the game and who lost. Let's follow the below logic to find that out.

Approach: 

Group by GAME_ID. Then get the maximum HOME_SCORE and maximum VISITOR_SCORE. 

 If maximum HOME_SCORE > maximum VISITOR_SCORE
	then WINING_TEAM = HOME_TEAM_NICKNAME
	then LOOSING_TEAM = VISITOR_TEAM_NICKNAME
 Else 
	then WINING_TEAM = VISITOR_TEAM_NICKNAME
	then LOOSING_TEAM = HOME_TEAM_NICKNAME

Save this WINING_TEAM, and LOOSING_TEAM in the data frame.

In [284]:
from pyspark.sql.functions import col, max as spark_max, when

# Step 1: Group by GAME_ID and calculate maximum scores
game_results = cleaned_data.groupBy("GAME_ID", "HOME_TEAM_NICKNAME", "VISITOR_TEAM_NICKNAME").agg(
    spark_max("HOME_SCORE").alias("MAX_HOME_SCORE"),
    spark_max("VISITOR_SCORE").alias("MAX_VISITOR_SCORE")
)

# Step 2: Determine WINING_TEAM, LOOSING_TEAM, and LOOSING_TEAM_SCORE
game_results = game_results.withColumn(
    "WINING_TEAM",
    when(col("MAX_HOME_SCORE") > col("MAX_VISITOR_SCORE"), col("HOME_TEAM_NICKNAME"))
    .otherwise(col("VISITOR_TEAM_NICKNAME"))
).withColumn(
    "LOOSING_TEAM",
    when(col("MAX_HOME_SCORE") > col("MAX_VISITOR_SCORE"), col("VISITOR_TEAM_NICKNAME"))
    .otherwise(col("HOME_TEAM_NICKNAME"))
).withColumn(
    "LOOSING_TEAM_SCORE",
    when(col("MAX_HOME_SCORE") > col("MAX_VISITOR_SCORE"), col("MAX_VISITOR_SCORE"))
    .otherwise(col("MAX_HOME_SCORE"))
)

# Step 3: Join back to the original dataset
final_game_result_data = cleaned_data.join(game_results, on=["GAME_ID", "HOME_TEAM_NICKNAME", "VISITOR_TEAM_NICKNAME"], how="left")

# Show the final result
final_game_result_data.show(truncate=False)

+--------+------------------+---------------------+--------------------------------------------------+------+-------------------------------------------------+----------+---------------+---------------------+---------------+----------+---------------+---------------------+---------------+----------+-------------+--------------+-----------------+-----------+------------+------------------+
|GAME_ID |HOME_TEAM_NICKNAME|VISITOR_TEAM_NICKNAME|HOMEDESCRIPTION                                   |PERIOD|VISITORDESCRIPTION                               |PLAYER1_ID|PLAYER1_NAME   |PLAYER1_TEAM_NICKNAME|PLAYER1_TEAM_ID|PLAYER2_ID|PLAYER2_NAME   |PLAYER2_TEAM_NICKNAME|PLAYER2_TEAM_ID|HOME_SCORE|VISITOR_SCORE|MAX_HOME_SCORE|MAX_VISITOR_SCORE|WINING_TEAM|LOOSING_TEAM|LOOSING_TEAM_SCORE|
+--------+------------------+---------------------+--------------------------------------------------+------+-------------------------------------------------+----------+---------------+---------------------+--------

Now we have identified the wining team and loosing team. 

Now we can remove some columns since we have the wining and loosing teams.

In [285]:
# Relevant columns selected
selected_columns = [
    'GAME_ID', 'WINING_TEAM', 'LOOSING_TEAM', 'LOOSING_TEAM_SCORE', 'HOMEDESCRIPTION', 'VISITORDESCRIPTION'
]

# Dataset with relevant columns selected
final_game_result_data = final_game_result_data.select(*selected_columns)

# Show the results
final_game_result_data.show()

+--------+-----------+------------+------------------+--------------------+--------------------+
| GAME_ID|WINING_TEAM|LOOSING_TEAM|LOOSING_TEAM_SCORE|     HOMEDESCRIPTION|  VISITORDESCRIPTION|
+--------+-----------+------------+------------------+--------------------+--------------------+
|20001116|      76ers|        Heat|                81|                NULL|Iverson 21' Jump ...|
|20001116|      76ers|        Heat|                81|Mason 17' Jump Sh...|                NULL|
|20001116|      76ers|        Heat|                81|                NULL|Lynch  Layup (2 P...|
|20001116|      76ers|        Heat|                81|                NULL|Iverson  Driving ...|
|20001116|      76ers|        Heat|                81|                NULL|Iverson 24' 3PT J...|
|20001116|      76ers|        Heat|                81|Grant 17' Jump Sh...|                NULL|
|20001116|      76ers|        Heat|                81|                NULL|Iverson 20' Jump ...|
|20001116|      76ers|        

Now let's find how many points each player scored for each game they played.

As we can see, "GAME_ID", "HOMEDESCRIPTION" and the "VISITORDESCRIPTION" column contains the information about the points scored by different players. Therefore we will extract these columns to a new dataframe to simplify the dataset.

Let's merge "HOMEDESCRIPTION", and "VISITORDESCRIPTION" columns to simplify the data frame further.

In [286]:
from pyspark.sql.functions import concat_ws, trim

# Merge HOMEDESCRIPTION and VISITORDESCRIPTION into a new column named "MERGED_DESCRIPTION"
player_score_for_each_game = final_game_result_data.withColumn("MERGED_DESCRIPTION", concat_ws(" ", "HOMEDESCRIPTION", "VISITORDESCRIPTION"))

# Remove rows where MERGED_DESCRIPTION is NULL or empty
player_score_for_each_game = player_score_for_each_game.filter(trim(col("MERGED_DESCRIPTION")) != "")

# Show the results
player_score_for_each_game.show()

+--------+-----------+------------+------------------+--------------------+--------------------+--------------------+
| GAME_ID|WINING_TEAM|LOOSING_TEAM|LOOSING_TEAM_SCORE|     HOMEDESCRIPTION|  VISITORDESCRIPTION|  MERGED_DESCRIPTION|
+--------+-----------+------------+------------------+--------------------+--------------------+--------------------+
|20001116|      76ers|        Heat|                81|                NULL|Iverson 21' Jump ...|Iverson 21' Jump ...|
|20001116|      76ers|        Heat|                81|Mason 17' Jump Sh...|                NULL|Mason 17' Jump Sh...|
|20001116|      76ers|        Heat|                81|                NULL|Lynch  Layup (2 P...|Lynch  Layup (2 P...|
|20001116|      76ers|        Heat|                81|                NULL|Iverson  Driving ...|Iverson  Driving ...|
|20001116|      76ers|        Heat|                81|                NULL|Iverson 24' 3PT J...|Iverson 24' 3PT J...|
|20001116|      76ers|        Heat|                81|Gr

Now the "player_score_for_each_game" data frame has "MERGED_DESCRIPTION". This column contains sentences like values. Therefore we need to extract the player names and the point they have scored from these sentences like values. 

The first word in this column is the name of the player. And the point is stored like this "(2 PTS)". We will extract these values to 2 new columns.

In [287]:
from pyspark.sql.functions import regexp_extract, col

# Define regex patterns
player_name_pattern = r"^(\w+)"  # First word as player name
points_pattern = r"\((\d+) PTS\)"  # Number inside parentheses followed by 'PTS'

# Extract PLAYER_NAME and PLAYER_POINTS
player_score_for_each_game = player_score_for_each_game.withColumn("PLAYER_NAME", regexp_extract(col("MERGED_DESCRIPTION"), player_name_pattern, 1)) \
           .withColumn("PLAYER_POINTS", regexp_extract(col("MERGED_DESCRIPTION"), points_pattern, 1).cast("int"))

# Show the resulting dataset
player_score_for_each_game.show()

+--------+-----------+------------+------------------+--------------------+--------------------+--------------------+-----------+-------------+
| GAME_ID|WINING_TEAM|LOOSING_TEAM|LOOSING_TEAM_SCORE|     HOMEDESCRIPTION|  VISITORDESCRIPTION|  MERGED_DESCRIPTION|PLAYER_NAME|PLAYER_POINTS|
+--------+-----------+------------+------------------+--------------------+--------------------+--------------------+-----------+-------------+
|20000773|       Heat|        Suns|                85|                NULL|Robinson 2' Runni...|Robinson 2' Runni...|   Robinson|            2|
|20000773|       Heat|        Suns|                85|                NULL|Marion 8' Jump Sh...|Marion 8' Jump Sh...|     Marion|            2|
|20000773|       Heat|        Suns|                85|                NULL|Robinson Free Thr...|Robinson Free Thr...|   Robinson|            3|
|20000773|       Heat|        Suns|                85|Mason  Layup (2 P...|                NULL|Mason  Layup (2 P...|      Mason|       

Let's remove the "MERGED_DESCRIPTION" column from the data frame as it will not be needed anymore.

In [288]:
# Relevant columns selected
selected_columns = [
    'GAME_ID', 'WINING_TEAM', 'LOOSING_TEAM', 'PLAYER_NAME', 'PLAYER_POINTS'
]

# Dataset with relevant columns selected
player_score_for_each_game = player_score_for_each_game.select(*selected_columns)

# Show the results
player_score_for_each_game.show()

+--------+-----------+------------+-----------+-------------+
| GAME_ID|WINING_TEAM|LOOSING_TEAM|PLAYER_NAME|PLAYER_POINTS|
+--------+-----------+------------+-----------+-------------+
|20000773|       Heat|        Suns|   Robinson|            2|
|20000773|       Heat|        Suns|     Marion|            2|
|20000773|       Heat|        Suns|   Robinson|            3|
|20000773|       Heat|        Suns|      Mason|            2|
|20000773|       Heat|        Suns|      Grant|            2|
|20000773|       Heat|        Suns|      Grant|            4|
|20000773|       Heat|        Suns|       Elie|            3|
|20000773|       Heat|        Suns|      Mason|            4|
|20000773|       Heat|        Suns|     Marion|            4|
|20000773|       Heat|        Suns|     Marion|            6|
|20000773|       Heat|        Suns|      Jones|            2|
|20000773|       Heat|        Suns|      Jones|            3|
|20000773|       Heat|        Suns|      Jones|            4|
|2000077

Now we have only above columns. Let's get the points scored for each player in each match.

In [289]:
from pyspark.sql.functions import col, row_number, last
from pyspark.sql.window import Window

# Define a window specification to find the last PLAYER_POINTS value
window_spec = Window.partitionBy("GAME_ID", "PLAYER_NAME").orderBy(col("GAME_ID").asc())

# Add row number to identify the last value
data_with_row_num = player_score_for_each_game.withColumn("row_num", row_number().over(window_spec))

# Group by GAME_ID and PLAYER_NAME to get the last PLAYER_POINTS value as TOTAL_SCORE
player_score_for_each_game = player_score_for_each_game.groupBy("GAME_ID", "WINING_TEAM", "LOOSING_TEAM", "PLAYER_NAME").agg(
    last("PLAYER_POINTS").alias("TOTAL_SCORE")
)

# Show the dimensions of the resulting dataset
num_rows = player_score_for_each_game.count()
num_cols = len(player_score_for_each_game.columns)
print(f"The dataset contains {num_rows} rows and {num_cols} columns after processing.")

# Show the resulting dataset
player_score_for_each_game.show(truncate=False)

The dataset contains 20682 rows and 5 columns after processing.
+--------+------------+-------------+-----------+-----------+
|GAME_ID |WINING_TEAM |LOOSING_TEAM |PLAYER_NAME|TOTAL_SCORE|
+--------+------------+-------------+-----------+-----------+
|20000233|Timberwolves|Knicks       |Day        |6          |
|20000021|Hawks       |Knicks       |Knight     |2          |
|20000614|Hawks       |Wizards      |Smith      |9          |
|20000650|Nuggets     |Bucks        |Bowen      |4          |
|20000197|Magic       |Bucks        |Williams   |2          |
|20000206|Pacers      |Lakers       |Fox        |7          |
|20001066|76ers       |Raptors      |Clark      |11         |
|20000439|Nets        |Bulls        |Miller     |2          |
|20001116|76ers       |Heat         |Mutombo    |5          |
|20000693|Nuggets     |Pistons      |Strickland |5          |
|20001051|Spurs       |Bucks        |Porter     |4          |
|20000270|Nuggets     |Magic        |Posey      |7          |
|20000

This dataset will be used as the data for model building task.

# 2. Model Building


Let's create a helper method to train the model.

In [290]:
from pyspark.sql.functions import col, when
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

# Train and evaluate the model
def train_model(data, team_a, team_b):
    # Step 1: Filter Matches between team_a and team_b
    filtered_data = data.filter(
        (col("WINING_TEAM").isin([team_a, team_b])) & (col("LOOSING_TEAM").isin([team_a, team_b]))
    )

    # Step 2: Extract relevant features
    filtered_data = filtered_data.withColumn("IS_WINNING_TEAM", when(col("WINING_TEAM") == team_a, 1).otherwise(0))
    
    # Encode PLAYER_NAME
    player_indexer = StringIndexer(inputCol="PLAYER_NAME", outputCol="PLAYER_INDEX")
    player_indexer_model = player_indexer.fit(filtered_data)
    filtered_data = player_indexer_model.transform(filtered_data)
    
    # Assemble features
    assembler = VectorAssembler(inputCols=["PLAYER_INDEX", "IS_WINNING_TEAM"], outputCol="features")
    filtered_data = assembler.transform(filtered_data)

    # Step 3: Split data frame into train and test
    train_data, test_data = filtered_data.randomSplit([0.8, 0.2], seed=42)

    # Step 4: Train the Model
    lr = LinearRegression(featuresCol="features", labelCol="TOTAL_SCORE", maxIter=10, regParam=0.3, elasticNetParam=0.8)
    lr_model = lr.fit(train_data)

    # Step 5: Evaluate the Model
    predictions = lr_model.transform(test_data)
    evaluator = RegressionEvaluator(labelCol="TOTAL_SCORE", predictionCol="prediction", metricName="rmse")
    rmse = evaluator.evaluate(predictions)
    print(f"Model trained. Root Mean Squared Error (RMSE): {rmse}")

    return lr_model, player_indexer_model, assembler

Let's create a helper function to predict the player points

In [291]:
def predict_player_points(lr_model, player_indexer_model, assembler, player_name, team_a, team_b):
    # Create new data for prediction
    player_data = spark.createDataFrame([(player_name,)], ["PLAYER_NAME"])
    player_data = player_indexer_model.transform(player_data)
    player_data = player_data.withColumn("IS_WINNING_TEAM", when(col("PLAYER_NAME") == player_name, 1).otherwise(0))
    player_data = assembler.transform(player_data)

    # Make prediction
    player_prediction = lr_model.transform(player_data)
    prediction_value = player_prediction.select("prediction").first()[0]  # Get the prediction value

    # Round up the prediction to the nearest integer
    required_points = math.ceil(prediction_value)

    # Format the output
    result = f"{player_name} should score at least {required_points} points to win the match against {team_b}"
    return result


In [292]:
# Example
team_a = "Timberwolves"
team_b = "Knicks"
player_name = "Day"

# Train the model
lr_model, player_indexer_model, assembler = train_model(player_score_for_each_game, team_a, team_b)

# Predict player points
result = predict_player_points(lr_model, player_indexer_model, assembler, player_name, team_a, team_b)
print(result)

Model trained. Root Mean Squared Error (RMSE): 6.16014071781651
Day should score at least 9 points to win the match against Knicks
