# Basketball Shot Prediction Evaluation

This notebook evaluates a trained PyTorch model (`shot_model.pth`) for basketball shot prediction on the test dataset (`test.csv`). The model predicts whether a shot is made or missed based on ball sequences and static features.

## Objectives
- Load and preprocess the test dataset.
- Evaluate the model using multiple metrics: accuracy, precision, recall, F1-score, confusion matrix, and AUC-ROC.
- Visualize the results with plots.

## Outputs
- Logs: `eval.log` with all metrics.
- Plots: `test_metrics.png` (bar plot of metrics), `confusion_matrix.png` (heatmap), `roc_curve.png` (ROC curve).


In [1]:
# Imports and Setup
import pandas as pd
import numpy as np
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix, roc_curve, auc
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import ast
import matplotlib.pyplot as plt
import seaborn as sns
from google.colab import drive
import os
import logging
import pickle

# Mount Google Drive
drive.mount('/content/drive')

# Define paths
data_path = '/content/drive/MyDrive/smai_project/dataset/dataset15/'
output_path = '/content/drive/MyDrive/smai_project/output_graphs/'
model_path = '/content/drive/MyDrive/smai_project/model/model15/'
log_file_path = os.path.join(output_path, 'eval.log')

# Create output directory if it doesn't exist
if not os.path.exists(output_path):
    os.makedirs(output_path)

# Set up logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(message)s',
    handlers=[
        logging.FileHandler(log_file_path),
        logging.StreamHandler()
    ]
)


Mounted at /content/drive


In [2]:
# Feature Extraction Function
def extract_sequence_features(df):
    """Extract features from ball and player sequences."""
    def get_shooter_position(ball_seq, players_seq, personId):
        shooter_x, shooter_y = None, None
        if players_seq and isinstance(players_seq, list) and len(players_seq) > 0:
            first_frame = players_seq[0] if isinstance(players_seq[0], list) else players_seq
            if first_frame and isinstance(first_frame, list):
                for player in first_frame:
                    if len(player) != 5:
                        continue
                    _, player_id, x, y, _ = player
                    if player_id == personId:
                        shooter_x, shooter_y = x, y
                        break

        if shooter_x is None or shooter_y is None:
            if not ball_seq or not isinstance(ball_seq, list) or len(ball_seq) == 0:
                return 0, 0
            first_pos = ball_seq[0]
            if not isinstance(first_pos, list) or len(first_pos) != 3:
                return 0, 0
            shooter_x, shooter_y, _ = first_pos
        return shooter_x, shooter_y

    df['shooter_x'], df['shooter_y'] = zip(*df.apply(
        lambda row: get_shooter_position(row['ball_seq'], row['players_seq'], row['personId']), axis=1
    ))

    df['initial_height'] = df['ball_seq'].apply(lambda seq: seq[0][2] if len(seq) > 0 and len(seq[0]) == 3 else 0)
    df['max_height'] = df['ball_seq'].apply(lambda seq: max([point[2] for point in seq]) if len(seq) > 0 and all(len(point) == 3 for point in seq) else 0)
    df['traj_length'] = df['ball_seq'].apply(len)
    df['shotDistance'] = df.apply(
        lambda row: np.sqrt((row['shooter_x'] - row['basket_x'])**2 + (row['shooter_y'] - row['basket_y'])**2), axis=1
    )
    df['shotDistance'] = df['shotDistance'].replace(0, 1)
    df['traj_curvature'] = df['max_height'] / df['shotDistance']

    df['release_angle'] = df.apply(
        lambda row: np.arctan2(row['shooter_y'] - row['basket_y'], row['shooter_x'] - row['basket_x']), axis=1
    )

    def calculate_defender_proximity(shooter_x, shooter_y, players_seq):
        min_distance = float('inf')
        shooter_team_id = None

        if not players_seq or not isinstance(players_seq, list) or len(players_seq) == 0:
            return 0

        first_frame = players_seq[0] if isinstance(players_seq[0], list) else players_seq
        if not first_frame or not isinstance(first_frame, list):
            return 0

        closest_player_distance = float('inf')
        for player in first_frame:
            if len(player) != 5:
                continue
            team_id, _, x, y, _ = player
            distance = np.sqrt((shooter_x - x)**2 + (shooter_y - y)**2)
            if distance < closest_player_distance:
                closest_player_distance = distance
                shooter_team_id = team_id

        if shooter_team_id is None:
            print(f"Could not determine shooter team for position ({shooter_x}, {shooter_y})")
            return 0

        for player in first_frame:
            if len(player) != 5:
                continue
            team_id, _, x, y, _ = player
            if team_id == shooter_team_id:
                continue
            distance = np.sqrt((shooter_x - x)**2 + (shooter_y - y)**2)
            min_distance = min(min_distance, distance)
        return min_distance if min_distance != float('inf') else 0

    df['defender_proximity'] = df.apply(
        lambda row: calculate_defender_proximity(row['shooter_x'], row['shooter_y'], row['players_seq']), axis=1
    )

    def extract_player_positions(players_seq, shooter_x, shooter_y):
        shooter_team_id = None
        teammates = []
        defenders = []

        if not players_seq or not isinstance(players_seq, list) or len(players_seq) == 0:
            return [0, 0, 0, 0]

        first_frame = players_seq[0] if isinstance(players_seq[0], list) else players_seq
        if not first_frame or not isinstance(first_frame, list):
            return [0, 0, 0, 0]

        closest_player_distance = float('inf')
        for player in first_frame:
            if len(player) != 5:
                continue
            team_id, _, x, y, _ = player
            distance = np.sqrt((shooter_x - x)**2 + (shooter_y - y)**2)
            if distance < closest_player_distance:
                closest_player_distance = distance
                shooter_team_id = team_id

        if shooter_team_id is None:
            print(f"Could not determine shooter team for position ({shooter_x}, {shooter_y})")
            return [0, 0, 0, 0]

        for player in first_frame:
            if len(player) != 5:
                continue
            team_id, _, x, y, _ = player
            if team_id == shooter_team_id:
                teammates.append([x, y])
            else:
                defenders.append([x, y])

        closest_teammate = [0, 0]
        if teammates:
            teammate_distances = [(np.sqrt((shooter_x - tx)**2 + (shooter_y - ty)**2), tx, ty) for tx, ty in teammates]
            teammate_distances.sort()
            closest_teammate = [teammate_distances[1][1], teammate_distances[1][2]] if len(teammate_distances) > 1 else teammate_distances[0][1:3]

        closest_defender = [0, 0]
        if defenders:
            defender_distances = [(np.sqrt((shooter_x - dx)**2 + (shooter_y - dy)**2), dx, dy) for dx, dy in defenders]
            defender_distances.sort()
            closest_defender = [defender_distances[0][1], defender_distances[0][2]]

        return closest_teammate + closest_defender

    df[['teammate_x', 'teammate_y', 'defender_x', 'defender_y']] = df.apply(
        lambda row: extract_player_positions(row['players_seq'], row['shooter_x'], row['shooter_y']), axis=1, result_type='expand'
    )

    def extract_ball_dynamics(ball_seq):
        if not ball_seq or len(ball_seq) < 3:
            return [0, 0, 0, 0, 0, 0]

        velocities = []
        for i in range(1, len(ball_seq)):
            if len(ball_seq[i]) != 3 or len(ball_seq[i-1]) != 3:
                continue
            dx = ball_seq[i][0] - ball_seq[i-1][0]
            dy = ball_seq[i][1] - ball_seq[i-1][1]
            dz = ball_seq[i][2] - ball_seq[i-1][2]
            velocities.append([dx, dy, dz])

        if not velocities:
            return [0, 0, 0, 0, 0, 0]

        accelerations = []
        for i in range(1, len(velocities)):
            ax = velocities[i][0] - velocities[i-1][0]
            ay = velocities[i][1] - velocities[i-1][1]
            az = velocities[i][2] - velocities[i-1][2]
            accelerations.append([ax, ay, az])

        if not accelerations:
            avg_vel_x = np.mean([v[0] for v in velocities])
            avg_vel_y = np.mean([v[1] for v in velocities])
            avg_vel_z = np.mean([v[2] for v in velocities])
            return [avg_vel_x, avg_vel_y, avg_vel_z, 0, 0, 0]

        avg_vel_x = np.mean([v[0] for v in velocities])
        avg_vel_y = np.mean([v[1] for v in velocities])
        avg_vel_z = np.mean([v[2] for v in velocities])
        avg_acc_x = np.mean([a[0] for a in accelerations])
        avg_acc_y = np.mean([a[1] for a in accelerations])
        avg_acc_z = np.mean([a[2] for a in accelerations])

        return [avg_vel_x, avg_vel_y, avg_vel_z, avg_acc_x, avg_acc_y, avg_acc_z]

    df[['avg_vel_x', 'avg_vel_y', 'avg_vel_z', 'avg_acc_x', 'avg_acc_y', 'avg_acc_z']] = df['ball_seq'].apply(
        lambda seq: extract_ball_dynamics(seq)
    ).apply(pd.Series)

    def extract_shot_arc(ball_seq, basket_x, basket_y):
        if not ball_seq or len(ball_seq) < 3:
            return [0, 0]

        try:
            release_x, release_y, release_z = ball_seq[0]
            peak_idx = max(range(len(ball_seq)), key=lambda i: ball_seq[i][2] if len(ball_seq[i]) == 3 else 0)
            peak_x, peak_y, peak_z = ball_seq[peak_idx]

            last_valid_idx = -1
            for i in range(len(ball_seq)-1, 0, -1):
                if len(ball_seq[i]) == 3:
                    last_valid_idx = i
                    break

            if last_valid_idx <= 0:
                return [0, 0]

            p2 = ball_seq[last_valid_idx]
            p1 = ball_seq[max(0, last_valid_idx-1)]

            if len(p1) != 3 or len(p2) != 3:
                return [0, 0]

            horizontal_dist = np.sqrt((p2[0] - p1[0])**2 + (p2[1] - p1[1])**2)
            entry_angle = 90 if horizontal_dist == 0 else np.degrees(np.arctan2(p2[2] - p1[2], horizontal_dist))

            total_dist = np.sqrt((basket_x - release_x)**2 + (basket_y - release_y)**2)
            arc_height_ratio = 0 if total_dist == 0 else peak_z / total_dist

            return [entry_angle, arc_height_ratio]
        except Exception as e:
            logging.error(f"Error extracting shot arc: {e}")
            return [0, 0]

    df[['entry_angle', 'arc_height_ratio']] = df.apply(
        lambda row: extract_shot_arc(row['ball_seq'], row['basket_x'], row['basket_y']),
        axis=1, result_type='expand'
    )

    return df


In [3]:
# Sequence Processing Function
def process_ball_sequences(df):
    """Process ball sequences for LSTM input."""
    sequences = []
    labels = []

    for _, row in df.iterrows():
        ball_seq = row['ball_seq']
        result = row['shotResult']

        if len(ball_seq) > 37:
            ball_seq = ball_seq[:37]
        elif len(ball_seq) < 37:
            last_pos = ball_seq[-1] if ball_seq else [0, 0, 0]
            while len(ball_seq) < 37:
                ball_seq.append(last_pos)

        sequence = []
        for moment in ball_seq:
            if isinstance(moment, list) and len(moment) == 3:
                sequence.append(moment)
            else:
                sequence.append([0, 0, 0])

        sequences.append(sequence)
        labels.append(result)

    return np.array(sequences), np.array(labels)


In [4]:
# PyTorch Dataset and Model Definition
class ShotDataset(Dataset):
    """Custom Dataset for basketball shot data."""
    def __init__(self, ball_sequences, static_features, labels):
        self.ball_sequences = torch.tensor(ball_sequences, dtype=torch.float32)
        self.static_features = torch.tensor(static_features, dtype=torch.float32)
        self.labels = torch.tensor(labels, dtype=torch.float32)

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        return self.ball_sequences[idx], self.static_features[idx], self.labels[idx]

class ShotPredictor(nn.Module):
    """PyTorch model for basketball shot prediction."""
    def __init__(self, ball_input_size=3, static_input_size=20, hidden_size=64):
        super(ShotPredictor, self).__init__()
        self.static_input_size = static_input_size
        self.lstm1 = nn.LSTM(ball_input_size, hidden_size, batch_first=True)
        self.dropout1 = nn.Dropout(0.2)
        self.lstm2 = nn.LSTM(hidden_size, hidden_size // 2, batch_first=True)
        self.dropout2 = nn.Dropout(0.2)

        self.static_fc1 = nn.Linear(static_input_size, 32)
        self.static_relu = nn.ReLU()
        self.static_dropout = nn.Dropout(0.2)

        self.fc_combined = nn.Linear(hidden_size // 2 + 32, 32)
        self.relu_combined = nn.ReLU()
        self.dropout_combined = nn.Dropout(0.2)
        self.fc_out = nn.Linear(32, 1)
        self.sigmoid = nn.Sigmoid()

    def forward(self, ball_seq, static_features):
        # Verify static features shape
        if static_features.size(-1) != self.static_input_size:
            raise ValueError(f"Expected static_features size {self.static_input_size}, got {static_features.size(-1)}")

        # Ball sequence processing
        lstm_out, _ = self.lstm1(ball_seq)
        lstm_out = self.dropout1(lstm_out)
        lstm_out, _ = self.lstm2(lstm_out)
        lstm_out = self.dropout2(lstm_out)
        lstm_out = lstm_out[:, -1, :]  # Take the last time step

        # Static features processing
        static_out = self.static_fc1(static_features)
        static_out = self.static_relu(static_out)
        static_out = self.static_dropout(static_out)

        # Combine
        combined = torch.cat((lstm_out, static_out), dim=1)
        out = self.fc_combined(combined)
        out = self.relu_combined(out)
        out = self.dropout_combined(out)
        out = self.fc_out(out)
        out = self.sigmoid(out)
        return out


In [5]:
# Load and Preprocess Test Data
print("Loading test dataset...")
test_df = pd.read_csv(data_path + 'test.csv')

test_df['players_seq'] = test_df['players_seq'].apply(ast.literal_eval)
test_df['ball_seq'] = test_df['ball_seq'].apply(ast.literal_eval)

test_df.drop(test_df[~test_df['shotResult'].isin(['Made Shot', 'Missed Shot'])].index, inplace=True)
test_df['shotResult'] = test_df['shotResult'].map({'Made Shot': 1, 'Missed Shot': 0})

print("Extracting features...")
test_df = extract_sequence_features(test_df)

static_features = [
    'shooter_x', 'shooter_y', 'release_angle', 'initial_height', 'max_height',
    'traj_length', 'traj_curvature', 'defender_proximity',
    'teammate_x', 'teammate_y', 'defender_x', 'defender_y',
    'avg_vel_x', 'avg_vel_y', 'avg_vel_z', 'avg_acc_x', 'avg_acc_y', 'avg_acc_z',
    'entry_angle', 'arc_height_ratio'
]

test_df[static_features] = test_df[static_features].replace([np.inf, -np.inf], 0).fillna(0)

print("Processing ball sequences...")
X_test_ball, y_test = process_ball_sequences(test_df)
X_test_ball = X_test_ball.reshape(X_test_ball.shape[0], 37, 3)

# Load scalers
print("Loading scalers...")
scaler_ball_path = os.path.join(model_path, 'scaler_ball.pkl')
scaler_static_path = os.path.join(model_path, 'scaler_static.pkl')

if not os.path.exists(scaler_ball_path) or not os.path.exists(scaler_static_path):
    raise FileNotFoundError("Scaler files not found. Ensure 'scaler_ball.pkl' and 'scaler_static.pkl' are in the model directory.")

with open(scaler_ball_path, 'rb') as f:
    scaler_ball = pickle.load(f)
with open(scaler_static_path, 'rb') as f:
    scaler_static = pickle.load(f)

# Normalize test data
X_test_ball_2d = X_test_ball.reshape(-1, 3)
X_test_ball_2d = scaler_ball.transform(X_test_ball_2d)
X_test_ball = X_test_ball_2d.reshape(X_test_ball.shape)

X_test_static = test_df[static_features].values
X_test_static = scaler_static.transform(X_test_static)

# Create dataset and data loader
test_dataset = ShotDataset(X_test_ball, X_test_static, y_test)
batch_size = 32
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)


Loading test dataset...
Extracting features...
Processing ball sequences...
Loading scalers...


In [6]:
# Model Evaluation
print("Loading model...")
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model_path_file = os.path.join(model_path, 'shot_model.pth')
if not os.path.exists(model_path_file):
    raise FileNotFoundError("Model file 'shot_model.pth' not found in the model directory.")

model = ShotPredictor(ball_input_size=3, static_input_size=len(static_features)).to(device)
model.load_state_dict(torch.load(model_path_file, map_location=device))
model.eval()

# Evaluate model
print("Evaluating model...")
y_true = []
y_pred_binary = []
y_pred_prob = []

with torch.no_grad():
    for ball_seq, static_features, labels in test_loader:
        ball_seq, static_features, labels = ball_seq.to(device), static_features.to(device), labels.to(device)
        outputs = model(ball_seq, static_features).squeeze()
        predicted = (outputs > 0.5).float()

        y_true.extend(labels.cpu().numpy())
        y_pred_binary.extend(predicted.cpu().numpy())
        y_pred_prob.extend(outputs.cpu().numpy())

y_true = np.array(y_true)
y_pred_binary = np.array(y_pred_binary)
y_pred_prob = np.array(y_pred_prob)

# Compute metrics
test_accuracy = accuracy_score(y_true, y_pred_binary)
test_precision = precision_score(y_true, y_pred_binary)
test_recall = recall_score(y_true, y_pred_binary)
test_f1 = f1_score(y_true, y_pred_binary)

# Compute confusion matrix
conf_matrix = confusion_matrix(y_true, y_pred_binary)

# Compute AUC-ROC
fpr, tpr, _ = roc_curve(y_true, y_pred_prob)
roc_auc = auc(fpr, tpr)

# Log metrics
print(f"Test Accuracy: {test_accuracy:.4f}")
print(f"Test Precision: {test_precision:.4f}")
print(f"Test Recall: {test_recall:.4f}")
print(f"Test F1-Score: {test_f1:.4f}")
print(f"Confusion Matrix:\n{conf_matrix}")
print(f"AUC-ROC: {roc_auc:.4f}")


Loading model...
Evaluating model...
Test Accuracy: 0.8405
Test Precision: 0.7866
Test Recall: 0.8690
Test F1-Score: 0.8257
Confusion Matrix:
[[487 108]
 [ 60 398]]
AUC-ROC: 0.9274


In [7]:
# Visualization
# Plot metrics
plt.figure(figsize=(8, 5))
metrics = [test_accuracy, test_precision, test_recall, test_f1]
metric_names = ['Accuracy', 'Precision', 'Recall', 'F1-Score']
plt.bar(metric_names, metrics, color=['blue', 'green', 'orange', 'purple'])
plt.title('Test Metrics')
plt.ylim(0, 1)
for i, v in enumerate(metrics):
    plt.text(i, v + 0.02, f"{v:.4f}", ha='center')
plt.savefig(os.path.join(output_path, 'test_metrics.png'))
plt.close()

# Plot confusion matrix
plt.figure(figsize=(6, 5))
sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues',
            xticklabels=['Missed', 'Made'], yticklabels=['Missed', 'Made'])
plt.title('Confusion Matrix')
plt.xlabel('Predicted')
plt.ylabel('True')
plt.savefig(os.path.join(output_path, 'confusion_matrix.png'))
plt.close()

# Plot ROC curve
plt.figure(figsize=(6, 5))
plt.plot(fpr, tpr, color='blue', lw=2, label=f'ROC curve (AUC = {roc_auc:.2f})')
plt.plot([0, 1], [0, 1], color='gray', lw=1, linestyle='--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver Operating Characteristic (ROC) Curve')
plt.legend(loc="lower right")
plt.savefig(os.path.join(output_path, 'roc_curve.png'))
plt.close()

print("Evaluation completed!")


Evaluation completed!


## Results

The evaluation is complete. Check the following outputs:
- **Logs**: `eval.log` in the output directory for all metrics (accuracy, precision, recall, F1-score, confusion matrix, AUC-ROC).
- **Plots**:
  - `test_metrics.png`: Bar plot of accuracy, precision, recall, and F1-score.
  - `confusion_matrix.png`: Heatmap of the confusion matrix.
  - `roc_curve.png`: ROC curve with AUC value.

## Next Steps
- Review the metrics to assess model performance.
- If performance is poor (e.g., low recall or AUC), consider retraining with a more complex model (e.g., more LSTM units) or collecting more data.
