In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
!ls "/content/drive/MyDrive/BDP-Spark/"

NBA-data.csv  Task-2-Spark.ipynb  Task-3-Spark-ML.ipynb


In [3]:
!pip install pyspark



In [4]:
from pyspark.sql import SparkSession

# Initialize the spark session
spark = SparkSession.builder.appName("NBA-Analysis").getOrCreate()
spark

In [5]:
# load the data
data = spark.read.csv("/content/drive/MyDrive/BDP-Spark/NBA-data.csv", header=True, inferSchema=True)
data.show()

+-------+--------+--------+--------------------+-------------------+------+----------+----------------+-------------------------+-----------------+---------------+---------------------+----------+------------+-------------------------+-----------------+---------------+---------------------+----------+-------------+-------------------------+-----------------+---------------+---------------------+------+-----------+--------------------+-------------+-------------+------+------+
|EVENTID|EVENTNUM| GAME_ID|     HOMEDESCRIPTION|       PCTIMESTRING|PERIOD|PLAYER1_ID|    PLAYER1_NAME|PLAYER1_TEAM_ABBREVIATION|PLAYER1_TEAM_CITY|PLAYER1_TEAM_ID|PLAYER1_TEAM_NICKNAME|PLAYER2_ID|PLAYER2_NAME|PLAYER2_TEAM_ABBREVIATION|PLAYER2_TEAM_CITY|PLAYER2_TEAM_ID|PLAYER2_TEAM_NICKNAME|PLAYER3_ID| PLAYER3_NAME|PLAYER3_TEAM_ABBREVIATION|PLAYER3_TEAM_CITY|PLAYER3_TEAM_ID|PLAYER3_TEAM_NICKNAME| SCORE|SCOREMARGIN|  VISITORDESCRIPTION|CLEANED_SCORE|POINTS_SCORED|WINNER| LOSER|
+-------+--------+--------+-----------

# Task 2 - Q3 - Spark Analysis

In [6]:
# import necessary libraries
from pyspark.sql.functions import col, udf, when, lit
from pyspark.sql.types import IntegerType, StringType, ArrayType, StructType, StructField
from pyspark.sql import functions as F
from pyspark.sql.window import Window

## Preprocessing Data

In [7]:
# function for clean the score
def clean_score(score):
  # handle null score
  if score is None:
    return None
  if isinstance(score, str):
    month_mapping = {'Jan': '1', 'Feb': '2', 'Mar': '3', 'Apr': '4',
                     'May': '5', 'Jun': '6', 'Jul': '7', 'Aug': '8',
                     'Sep': '9', 'Oct': '10', 'Nov': '11', 'Dec': '12'}
    parts = score.split('-')
    if parts[0] in month_mapping:
      return f"{month_mapping[parts[0]]}-{parts[1]}"
    elif parts[1] in month_mapping:
      return f"{parts[0]}-{month_mapping[parts[1]]}"

    return score if '-' in score else None

  return None

In [8]:
# Apply cleaning function to the DataFrame
data = data.withColumn("CLEANED_SCORE", F.udf(clean_score,
                                              StringType())(data['SCORE']))

# Filter out rows where CLEANED_SCORE is null
cleaned_data = data.filter(data['CLEANED_SCORE'].isNotNull())

# Create a window specification to partition by GAME_ID and order by EVENTID
window_spec = Window.partitionBy("GAME_ID").orderBy("EVENTID")

# Initialize the previous score by using lag() function to get previous CLEANED_SCORE value
cleaned_data = cleaned_data.withColumn("PREVIOUS_SCORE",
                                       F.lag("CLEANED_SCORE").over(window_spec))

In [9]:
print(cleaned_data)

DataFrame[EVENTID: int, EVENTNUM: int, GAME_ID: int, HOMEDESCRIPTION: string, PCTIMESTRING: timestamp, PERIOD: int, PLAYER1_ID: int, PLAYER1_NAME: string, PLAYER1_TEAM_ABBREVIATION: string, PLAYER1_TEAM_CITY: string, PLAYER1_TEAM_ID: int, PLAYER1_TEAM_NICKNAME: string, PLAYER2_ID: int, PLAYER2_NAME: string, PLAYER2_TEAM_ABBREVIATION: string, PLAYER2_TEAM_CITY: string, PLAYER2_TEAM_ID: int, PLAYER2_TEAM_NICKNAME: string, PLAYER3_ID: int, PLAYER3_NAME: string, PLAYER3_TEAM_ABBREVIATION: string, PLAYER3_TEAM_CITY: string, PLAYER3_TEAM_ID: int, PLAYER3_TEAM_NICKNAME: string, SCORE: string, SCOREMARGIN: string, VISITORDESCRIPTION: string, CLEANED_SCORE: string, POINTS_SCORED: int, WINNER: string, LOSER: string, PREVIOUS_SCORE: string]


In [10]:
# Function to calculate play score using UDF
def calculate_play_score(df):
  # inner function for calculate score
  def calculate_score(cleaned_score, previous_score):
    # If score is None, play score is 0
    if cleaned_score is None:
      return 0
    # First valid score, return sum of current score
    if previous_score is None:
      team1, team2 = map(int, cleaned_score.split('-'))
      return team1 + team2
    else:
      # Calculate difference between previous and current score
      previous_team1, previous_team2 = map(int, previous_score.split('-')) if previous_score else (0, 0)
      current_team1, current_team2 = map(int, cleaned_score.split('-'))
      return (current_team1 - previous_team1) + (current_team2 - previous_team2)

  # Create a UDF to apply the score calculation function row-wise
  calculate_score_udf = F.udf(calculate_score, IntegerType())

  return df.withColumn("PLAY_SCORE", calculate_score_udf(df['CLEANED_SCORE'], df['PREVIOUS_SCORE']))

In [11]:
# Apply the play score calculation
cleaned_data = calculate_play_score(cleaned_data)

# Show the result
cleaned_data.select("GAME_ID", "EVENTID", "PLAYER1_NAME", "PLAYER1_TEAM_NICKNAME", "SCORE",
                    "CLEANED_SCORE", "PREVIOUS_SCORE", "PLAY_SCORE").show(truncate=False)

+--------+-------+----------------+---------------------+-------+-------------+--------------+----------+
|GAME_ID |EVENTID|PLAYER1_NAME    |PLAYER1_TEAM_NICKNAME|SCORE  |CLEANED_SCORE|PREVIOUS_SCORE|PLAY_SCORE|
+--------+-------+----------------+---------------------+-------+-------------+--------------+----------+
|20000001|5      |Theo Ratliff    |76ers                |Jan-00 |1-00         |NULL          |1         |
|20000001|9      |Marcus Camby    |Knicks               |1-Jan  |1-1          |1-00          |1         |
|20000001|10     |Marcus Camby    |Knicks               |2-Jan  |2-1          |1-1           |1         |
|20000001|20     |Theo Ratliff    |76ers                |2-Mar  |2-3          |2-1           |2         |
|20000001|21     |Allan Houston   |Knicks               |4-Mar  |4-3          |2-3           |2         |
|20000001|27     |Theo Ratliff    |76ers                |4-May  |4-5          |4-3           |2         |
|20000001|30     |Eric Snow       |76ers      

## 1. Percentage of players who scored 40 or more points in a single game

In [12]:
# Group by PLAYER1_NAME and GAME_ID and calculate the total points
player_score = cleaned_data.groupBy("PLAYER1_NAME" , "GAME_ID").agg(
    F.sum("PLAY_SCORE").alias("TOTAL_POINTS")
)

In [13]:
# filter player score 40 or more
filter_players = player_score.filter(player_score.TOTAL_POINTS >= 40)

In [14]:
# number of players
no_of_players = data.filter(col("PLAYER1_NAME").isNotNull()).select("PLAYER1_NAME").distinct().count()

In [15]:
# filter unique players who scored 40 or more points
filter_players_count = filter_players.filter(col("PLAYER1_NAME").isNotNull())\
.select("PLAYER1_NAME").distinct().count()

In [17]:
# Percentage of players who scored 40 or more points
player_percentage = (filter_players_count / no_of_players) * 100

print(f"The Percentage of players who scored 40 or more points in a single game: {player_percentage:.2f}%")

The Percentage of players who scored 40 or more points in a single game: 6.36%


## 2. The total number of matches lost by each team.

In [18]:
# Group by GAME_ID and PLAYER1_TEAM_NICKNAME
# after calculate the sum of PLAY_SCORE for each team
game_scores = cleaned_data.groupBy("GAME_ID", "PLAYER1_TEAM_NICKNAME").agg(
    F.sum("PLAY_SCORE").alias("TOTAL_SCORE")
)

In [19]:
# window function to rank the teams within each game by their total score
window_spec = Window.partitionBy("GAME_ID").orderBy(F.col("TOTAL_SCORE").desc())

In [20]:
# Add a column to rank teams based on their total score within each game
game_scores = game_scores.withColumn("RANK", F.row_number().over(window_spec))

In [21]:
# identify the winner and loser
game_results = game_scores.withColumn(
    "RESULT",
    F.when(F.col("RANK") == 1, "Winner").otherwise("Loser")
)

In [22]:
# filter using PLAYER1_TEAM_NICKNAME
game_results = game_results.filter((F.col("PLAYER1_TEAM_NICKNAME").isNotNull()))
game_results.show(truncate=False)

+--------+---------------------+-----------+----+------+
|GAME_ID |PLAYER1_TEAM_NICKNAME|TOTAL_SCORE|RANK|RESULT|
+--------+---------------------+-----------+----+------+
|20000001|76ers                |101        |1   |Winner|
|20000001|Knicks               |72         |2   |Loser |
|20000002|Cavaliers            |86         |1   |Winner|
|20000002|Nets                 |82         |2   |Loser |
|20000003|Magic                |97         |1   |Winner|
|20000003|Wizards              |86         |2   |Loser |
|20000004|Hornets              |106        |1   |Winner|
|20000004|Hawks                |82         |2   |Loser |
|20000005|Pistons              |104        |1   |Winner|
|20000005|Raptors              |95         |2   |Loser |
|20000006|Kings                |100        |1   |Winner|
|20000006|Bulls                |81         |2   |Loser |
|20000007|Mavericks            |97         |1   |Winner|
|20000007|Bucks                |93         |2   |Loser |
|20000008|Timberwolves         

In [23]:
# Filter the game results to include only the "Loser" teams
losses = game_results.filter(
    (F.col("RESULT") == "Loser") & (F.col("PLAYER1_TEAM_NICKNAME").isNotNull())
)

In [24]:
# Group by PLAYER1_TEAM_NICKNAME and count the number of losses for each team
team_losses = losses.groupBy("PLAYER1_TEAM_NICKNAME").agg(
    F.count("RESULT").alias("NUMBER_OF_LOSSES")
)

In [25]:
# Sort the teams by the number of losses in descending order
team_losses_sorted = team_losses.orderBy(F.col("NUMBER_OF_LOSSES").desc())
team_losses_sorted.show(truncate=False)

+---------------------+----------------+
|PLAYER1_TEAM_NICKNAME|NUMBER_OF_LOSSES|
+---------------------+----------------+
|Bulls                |67              |
|Warriors             |65              |
|Wizards              |63              |
|Grizzlies            |59              |
|Hawks                |57              |
|Nets                 |56              |
|Cavaliers            |52              |
|Clippers             |51              |
|Pistons              |50              |
|Celtics              |46              |
|Nuggets              |42              |
|Pacers               |41              |
|Magic                |39              |
|SuperSonics          |38              |
|Rockets              |37              |
|Hornets              |36              |
|Raptors              |35              |
|Timberwolves         |35              |
|Knicks               |34              |
|Trail Blazers        |32              |
+---------------------+----------------+
only showing top

# Task - 3 - Machine Learning

In [26]:
# Perform a left join to retain all records from cleaned data
merged_data = data.join(
    cleaned_data.select("GAME_ID", "EVENTID", "PLAY_SCORE"),
    on=["GAME_ID", "EVENTID"],
    how="left"
)

In [27]:
# Fill null values in the PLAY_SCORE column with 0
new_data = merged_data.withColumn(
    "PLAY_SCORE",
    F.when(F.col("PLAY_SCORE").isNull(), 0).otherwise(F.col("PLAY_SCORE"))
)

# Show the resulting DataFrame
new_data.show()

+--------+-------+--------+--------------------+-------------------+------+----------+----------------+-------------------------+-----------------+---------------+---------------------+----------+------------+-------------------------+-----------------+---------------+---------------------+----------+-------------+-------------------------+-----------------+---------------+---------------------+------+-----------+--------------------+-------------+-------------+------+------+----------+
| GAME_ID|EVENTID|EVENTNUM|     HOMEDESCRIPTION|       PCTIMESTRING|PERIOD|PLAYER1_ID|    PLAYER1_NAME|PLAYER1_TEAM_ABBREVIATION|PLAYER1_TEAM_CITY|PLAYER1_TEAM_ID|PLAYER1_TEAM_NICKNAME|PLAYER2_ID|PLAYER2_NAME|PLAYER2_TEAM_ABBREVIATION|PLAYER2_TEAM_CITY|PLAYER2_TEAM_ID|PLAYER2_TEAM_NICKNAME|PLAYER3_ID| PLAYER3_NAME|PLAYER3_TEAM_ABBREVIATION|PLAYER3_TEAM_CITY|PLAYER3_TEAM_ID|PLAYER3_TEAM_NICKNAME| SCORE|SCOREMARGIN|  VISITORDESCRIPTION|CLEANED_SCORE|POINTS_SCORED|WINNER| LOSER|PLAY_SCORE|
+--------+------

In [28]:
# Determine total points scored by each team per game
team_scores = (
    new_data.groupBy("GAME_ID", "PLAYER1_TEAM_NICKNAME")
    .agg(F.sum("PLAY_SCORE").alias("TOTAL_POINTS"))
)

In [29]:
# Determine winners and losers
game_results = (
    team_scores.alias("a")
    .join(
        team_scores.alias("b"),
        (F.col("a.GAME_ID") == F.col("b.GAME_ID")) &
        (F.col("a.PLAYER1_TEAM_NICKNAME") != F.col("b.PLAYER1_TEAM_NICKNAME")),
    )
    .select(
        F.col("a.GAME_ID").alias("Game_ID"),
        F.col("a.PLAYER1_TEAM_NICKNAME").alias("Winner"),
        F.col("b.PLAYER1_TEAM_NICKNAME").alias("Loser"),
    )
    .filter(F.col("a.TOTAL_POINTS") > F.col("b.TOTAL_POINTS"))
)

In [30]:
# Filter players on the winning team and calculate total points per player
winning_players_total_score = (
    new_data.alias("players")
    .join(game_results.alias("results"),
          (F.col("players.GAME_ID") == F.col("results.Game_ID")) &
          (F.col("players.PLAYER1_TEAM_NICKNAME") == F.col("results.Winner")),
          "inner")
    .groupBy("players.GAME_ID", "players.PLAYER1_NAME", "players.PLAYER1_TEAM_NICKNAME", "results.Loser")
    .agg(F.sum("players.PLAY_SCORE").alias("Total_Points"))
    .select(
        F.col("GAME_ID").alias("Game_ID"),
        F.col("PLAYER1_NAME").alias("Player_Name"),
        F.col("PLAYER1_TEAM_NICKNAME").alias("Team"),
        F.col("Loser").alias("Opponent"),
        F.col("Total_Points").alias("Points_Scored")
    )
)

# Show the results
winning_players_total_score.show()

+--------+------------------+---------+--------+-------------+
| Game_ID|       Player_Name|     Team|Opponent|Points_Scored|
+--------+------------------+---------+--------+-------------+
|20000002|     Lamond Murray|Cavaliers|    Nets|           17|
|20000002|Clar. Weatherspoon|Cavaliers|    Nets|            8|
|20000002|      Andre Miller|Cavaliers|    Nets|            8|
|20000002|     Matt Harpring|Cavaliers|    Nets|           16|
|20000002|Zydrunas Ilgauskas|Cavaliers|    Nets|            0|
|20000002|     Wesley Person|Cavaliers|    Nets|            6|
|20000002|     Chris Gatling|Cavaliers|    Nets|           11|
|20000002|       Bimbo Coles|Cavaliers|    Nets|           15|
|20000002|    Robert Traylor|Cavaliers|    Nets|            5|
|20000002|  Cedric Henderson|Cavaliers|    Nets|            0|
|20000002|        Chris Mihm|Cavaliers|    Nets|            0|
|20000004|       Baron Davis|  Hornets|   Hawks|           20|
|20000004|    Jamal Mashburn|  Hornets|   Hawks|       

## 1. Prepare the data

In [31]:
model_data = winning_players_total_score.select(
    F.col("Player_Name").alias("Player"),
    F.col("Opponent"),
    F.col("Points_Scored").alias("Actual_Score")
)

## 2. Encode categorical variables

In [32]:
from pyspark.ml.feature import StringIndexer

player_indexer = StringIndexer(inputCol="Player", outputCol="Player_Index")
opponent_indexer = StringIndexer(inputCol="Opponent", outputCol="Opponent_Index")

In [33]:
indexed_data = player_indexer.fit(model_data).transform(model_data)
indexed_data = opponent_indexer.fit(indexed_data).transform(indexed_data)

In [34]:
from pyspark.ml.feature import OneHotEncoder

# OneHotEncoder for Player and Opponent
player_encoder = OneHotEncoder(inputCol="Player_Index", outputCol="Player_OneHot")
opponent_encoder = OneHotEncoder(inputCol="Opponent_Index", outputCol="Opponent_OneHot")

encoded_data = player_encoder.fit(indexed_data).transform(indexed_data)
encoded_data = opponent_encoder.fit(encoded_data).transform(encoded_data)

## 3. Assemble features

In [35]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=["Player_OneHot", "Opponent_OneHot"],
    outputCol="features"
)

data_features = assembler.transform(encoded_data)

## 4. Split the Data

In [36]:
train_data, test_data = data_features.randomSplit([0.8, 0.2], seed=42)

## 5. Model Training, Prediction, and Evaluation

In [37]:
from pyspark.ml.regression import (
    LinearRegression,
    RandomForestRegressor,
    GBTRegressor,
    DecisionTreeRegressor
)
from pyspark.ml.evaluation import RegressionEvaluator

In [38]:
# build the models
models = {
    "Linear Regression": LinearRegression(featuresCol="features", labelCol="Actual_Score"),
    "Random Forest Regression": RandomForestRegressor(featuresCol="features", labelCol="Actual_Score"),
    "GBT Regression": GBTRegressor(featuresCol="features", labelCol="Actual_Score"),
    "Decision Tree Regression": DecisionTreeRegressor(featuresCol="features", labelCol="Actual_Score"),
}

In [39]:
# Train and Evaluate every model
results = []
evaluator = RegressionEvaluator(labelCol="Actual_Score", predictionCol="prediction", metricName="rmse")

for name, model in models.items():
    # Train the model
    trained_model = model.fit(train_data)
    # Predict on the test data
    predictions = trained_model.transform(test_data)
    # Evaluate RMSE
    rmse = evaluator.evaluate(predictions)
    # Evaluate R2
    r2 = evaluator.setMetricName("r2").evaluate(predictions)
    # Append results
    results.append((name, rmse, r2, predictions.select("Player", "Opponent", "Actual_Score", "prediction")))

In [40]:
print("Model Comparison Results:")
for name, rmse, r2, predictions_df in results:
    print(f"\nModel: {name}")
    print(f"  RMSE: {rmse:.3f}")
    print(f"  R2: {r2:.3f}")

Model Comparison Results:

Model: Linear Regression
  RMSE: 6.687
  R2: 0.483

Model: Random Forest Regression
  RMSE: 0.108
  R2: 0.108

Model: GBT Regression
  RMSE: 0.260
  R2: 0.260

Model: Decision Tree Regression
  RMSE: 0.077
  R2: 0.077


## 6. Hyper Parameter Tuning

In [41]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [42]:
# Define the Linear Regression model
linear_model = LinearRegression(featuresCol="features", labelCol="Actual_Score")

# Create a parameter grid for hyperparameter tuning
paramGrid = ParamGridBuilder().addGrid(linear_model.regParam,
 [0.01, 0.1, 0.5, 1.0]).addGrid(linear_model.elasticNetParam,
  [0.0, 0.5, 1.0]).build()

# Define the evaluator for RMSE
evaluator = RegressionEvaluator(labelCol="Actual_Score",
                                predictionCol="prediction",
                                metricName="rmse")

# Set up CrossValidator
crossval = CrossValidator(estimator=linear_model,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=5)

In [43]:
# Train the model with cross-validation
cv_model = crossval.fit(train_data)

# Get the best model
best_model = cv_model.bestModel

# Evaluate the model on test data
predictions = best_model.transform(test_data)

# Calculate RMSE and R2 on test data
rmse = evaluator.evaluate(predictions)
r2_evaluator = RegressionEvaluator(labelCol="Actual_Score",
                                   predictionCol="prediction",
                                   metricName="r2")
r2 = r2_evaluator.evaluate(predictions)

In [44]:
# Show predictions
print("Best Linear Regression Model Predictions (Top 5):")
predictions.select("Player", "Opponent", "Actual_Score", "prediction").show(5)

# Output the evaluation metrics
print(f"Best Linear Regression Model - RMSE: {rmse}, R2: {r2}")

# Print the best hyperparameters
print(f"Best regParam: {best_model._java_obj.getRegParam()}")
print(f"Best elasticNetParam: {best_model._java_obj.getElasticNetParam()}")

Best Linear Regression Model Predictions (Top 5):
+----------+---------+------------+-----------------+
|    Player| Opponent|Actual_Score|       prediction|
+----------+---------+------------+-----------------+
|A.C. Green|    Bulls|           0|5.373035715762285|
|A.C. Green|  Celtics|          12|5.629694488773146|
|A.C. Green|Grizzlies|           2|5.391726663948985|
|A.C. Green|     Jazz|           0|4.786912338401125|
|A.C. Green|  Raptors|          14|5.833808353523322|
+----------+---------+------------+-----------------+
only showing top 5 rows

Best Linear Regression Model - RMSE: 6.686678555332445, R2: 0.48263004170596113
Best regParam: 0.01
Best elasticNetParam: 1.0
