In [6]:
import os
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
import torch
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

Using device: cuda


In [7]:
print("Current Working Directory:", os.getcwd())
file_path = 'data/cleaned_data.csv' 
processed_data = pd.read_csv(file_path, encoding='latin1')
print(processed_data.columns)

Current Working Directory: C:\Users\gushi\LTU\TennisStrokePrediction
Index(['Pt', 'Set1', 'Set2', 'Gm1', 'Gm2', 'Pts', 'Gm#', 'TB?', 'rallyLen',
       'server_score_raw', 'receiver_score_raw', 'player_score',
       'opponent_score', 'shot1', 'shot2', 'shot3', 'shot4', 'winner_array',
       'unforced_array', 'is_deuce', 'is_break_point', 'is_game_point',
       'point_diff', 'total_sets_played', 'total_games_played', 'is_tiebreak',
       'match_pressure_score', 'rally_intensity', 'fatigue_index',
       'estimated_stamina', 'shot1_encoded', 'shot2_encoded', 'shot3_encoded',
       'shot4_encoded'],
      dtype='object')


In [None]:
import ast

def parse_array_column(column):
    return column.apply(lambda x: np.array(ast.literal_eval(x), dtype=np.float32))

processed_data['winner_array'] = parse_array_column(processed_data['winner_array'])
processed_data['unforced_array'] = parse_array_column(processed_data['unforced_array'])

### RNN Implementation

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class CNNLSTMShotPredictor(nn.Module):
    def __init__(self, num_classes, input_size, hidden_size, num_layers):
        super(CNNLSTMShotPredictor, self).__init__()

        self.conv1d = nn.Conv1d(
            in_channels=input_size,   # each feature over time
            out_channels=64,
            kernel_size=2,
            padding=1
        )

        self.lstm = nn.LSTM(
            input_size=64,           # output of conv1d per timestep
            hidden_size=hidden_size,
            num_layers=num_layers,
            batch_first=True,
            bidirectional=False      # match your best model
        )

        self.dropout = nn.Dropout(0.3)
        self.classifier = nn.Linear(hidden_size, num_classes)

    def forward(self, x):
        # x: [B, T, input_size] → [B, input_size, T]
        x = x.permute(0, 2, 1)

        # Apply 1D CNN
        x = self.conv1d(x)          # [B, 64, T]
        x = F.relu(x)

        # Back to [B, T, 64]
        x = x.permute(0, 2, 1)

        # Apply LSTM
        lstm_out, (hidden, _) = self.lstm(x)  # hidden: [num_layers, B, hidden_size]
        final_hidden = hidden[-1]             # [B, hidden_size]
        final_hidden = self.dropout(final_hidden)

        return self.classifier(final_hidden)  # [B, num_classes]


#### Training using label encoding

In [None]:
import pandas as pd
import numpy as np
import torch
from sklearn.preprocessing import LabelEncoder, StandardScaler
from sklearn.model_selection import train_test_split
import pickle

# Copy the original data 
processed_data_rnn = processed_data.copy()

# Unpack winner and unforced arrays 
winner_df = pd.DataFrame(processed_data_rnn['winner_array'].tolist(), index=processed_data_rnn.index)
winner_df.columns = [f'winner_{i}' for i in range(winner_df.shape[1])]
unforced_df = pd.DataFrame(processed_data_rnn['unforced_array'].tolist(), index=processed_data_rnn.index)
unforced_df.columns = [f'unforced_{i}' for i in range(unforced_df.shape[1])]
processed_data_rnn.drop(columns=['winner_array', 'unforced_array'], inplace=True)
processed_data_rnn = pd.concat([processed_data_rnn, winner_df, unforced_df], axis=1)

# Filter out serve tokens from shot4 only
def is_rally(tok):
    return isinstance(tok, str) and tok[0] not in {'0', '4', '5', '6'}

processed_data_rnn = processed_data_rnn[processed_data_rnn['shot4'].apply(is_rally)].copy()

# Refit LabelEncoder ONLY on shot1-4 after shot4 filtering 
all_shots = pd.concat([
    processed_data_rnn['shot1'],
    processed_data_rnn['shot2'],
    processed_data_rnn['shot3'],
    processed_data_rnn['shot4']
]).astype(str)

label_encoder = LabelEncoder()
label_encoder.fit(all_shots)

# Save the encoder
with open("label_encoder_full.pkl", "wb") as f:
    pickle.dump(label_encoder, f)

# Encode all shots
for col in ['shot1', 'shot2', 'shot3', 'shot4']:
    processed_data_rnn[col] = label_encoder.transform(processed_data_rnn[col].astype(str))

# Keep only XGBoost-important features + shot1, shot2, shot3 
required_shots = {'shot1', 'shot2', 'shot3', 'shot4'}

# Define target and features
target = processed_data_rnn['shot4']
features = processed_data_rnn.drop(columns=['shot4'])

# Build 3-shot input sequences 
context_cols = [col for col in features.columns if col not in ['shot1', 'shot2', 'shot3']]
X_sequences = []

for _, row in features.iterrows():
    context = row[context_cols].values.astype(np.float32)
    shot1 = np.insert(context, 0, row['shot1'])
    shot2 = np.insert(context, 0, row['shot2'])
    shot3 = np.insert(context, 0, row['shot3'])
    X_sequences.append(np.stack([shot1, shot2, shot3]))

# Normalize features
X_scaled = np.array(X_sequences)

# Convert to tensors
X_tensor = torch.tensor(X_scaled, dtype=torch.float32)
y_tensor = torch.tensor(target.values, dtype=torch.long)

# Train/Val/Test Split (60/20/20)
X_train, X_temp, y_train, y_temp = train_test_split(X_tensor, y_tensor, test_size=0.3, random_state=42)
X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=2/3, random_state=42)

print("✅ Final preprocessing complete.")
print(f"Train shape: {X_train.shape} — Target shape: {y_train.shape}")
print(f"Num classes in encoder: {len(label_encoder.classes_)} — Unique labels in target: {len(np.unique(y_train.numpy()))}")


In [None]:
from torch.utils.data import TensorDataset, DataLoader
from torch.utils.data import DataLoader, TensorDataset, WeightedRandomSampler
from collections import Counter 

train_dataset = TensorDataset(X_train, y_train)
val_dataset   = TensorDataset(X_val, y_val)
test_dataset  = TensorDataset(X_test, y_test)

# --- Create DataLoaders ---
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
val_loader   = DataLoader(val_dataset, batch_size=64, shuffle=False)
test_loader  = DataLoader(test_dataset, batch_size=64, shuffle=False)

In [None]:
import torch.optim as optim
from sklearn.utils.class_weight import compute_class_weight

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


model = CNNLSTMShotPredictor(
    num_classes=len(label_encoder.classes_),
    input_size=X_train.shape[2],
    hidden_size=64,
    num_layers=1
).to(device)


criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.0007, weight_decay=1e-5)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=20, gamma=0.7)

# Training loop
num_epochs = 100

best_val_acc = 0
patience = 15
patience_counter = 0

min_delta = 0.001  # minimum improvement to reset patience
best_val_loss = float('inf')  # initialize with infinity

for epoch in range(num_epochs):
    model.train()
    total_loss = 0

    for inputs, targets in train_loader:
        inputs, targets = inputs.to(device), targets.to(device)

        outputs = model(inputs)
        loss = criterion(outputs, targets)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    scheduler.step()

    # --- Evaluate on train set ---
    model.eval()
    correct_train = 0
    total_train = 0

    with torch.no_grad():
        for inputs, targets in train_loader:
            inputs, targets = inputs.to(device), targets.to(device)
            outputs = model(inputs)
            _, predicted = torch.max(outputs, 1)
            correct_train += (predicted == targets).sum().item()
            total_train += targets.size(0)

    train_acc = correct_train / total_train
    avg_loss = total_loss / len(train_loader)
    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {avg_loss:.4f}, Train Accuracy: {train_acc:.4f}")

    # --- Evaluate on validation set ---
    correct_val = 0
    total_val = 0
    val_loss = 0.0

    with torch.no_grad():
        for inputs, targets in val_loader:
            inputs, targets = inputs.to(device), targets.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, targets)
            val_loss += loss.item()

            _, predicted = torch.max(outputs, 1)
            correct_val += (predicted == targets).sum().item()
            total_val += targets.size(0)

    val_loss /= len(val_loader)
    val_acc = correct_val / total_val
    print(f"Validation Accuracy: {val_acc:.4f}, Validation Loss: {val_loss:.4f}")

    # --- Early stopping with min_delta ---
    if best_val_loss - val_loss > min_delta:
        best_val_loss = val_loss
        patience_counter = 0
        torch.save(model.state_dict(), "best_model.pt")
    else:
        patience_counter += 1
        if patience_counter >= patience:
            print(f"⛔ Early stopping triggered. Best validation loss: {best_val_loss:.4f}")
            break




In [None]:
from sklearn.metrics import accuracy_score

model.eval()
all_preds = []
all_labels = []

with torch.no_grad():
    for inputs, labels in test_loader:
        inputs = inputs.to(device)
        outputs = model(inputs)
        preds = torch.argmax(outputs, dim=1).cpu()
        all_preds.extend(preds.numpy())
        all_labels.extend(labels.numpy())

accuracy = accuracy_score(all_labels, all_preds)
print(f"Test Accuracy: {accuracy:.4f}")
