In [None]:
import os

# i want to read know the number of files in each subfolder of VB_DATA/extracted_features_blue_20

data_path = "../VB_DATA/extracted_features_positional_green_20_normalized/"
subfolders = [f.name for f in os.scandir(data_path) if f.is_dir()]
file_counts = {}

for f in subfolders:
    file_counts[f] = len(os.listdir(os.path.join(data_path, f)))

print ("File counts in each subfolder:")
for subfolder, count in file_counts.items():
    print(f"{subfolder}: {count}")  

In [None]:
import numpy as np
import pandas as pd
import ast
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import to_categorical
from tensorflow.keras import layers, models
import tensorflow as tf

TARGET_FRAMES = 20
TARGET_SAMPLES = 138

# here in case we want to change the number of categories
"""
CATAGORIES = ['00_Short_Serve','01_Cross_Court_Flight','02_Lift','03_Tap_Smash','04_Block',
              '05_Drop_Shot','06_Push_Shot','07_Transitional_Slice','08_Cut','09_Rush_Shot',
              '10_Defensive_Clear','11_Defensive_Drive','12_Clear','13_Long_Serve','14_Smash',
              '15_Flat_Shot','16_Rear_Court_Flat_Drive','17_Short_Flat_Shot']
"""
"""
00_Short_Serve: 816
02_Lift: 738
04_Block: 289
05_Drop_Shot: 765
06_Push_Shot: 351
08_Cut: 552
12_Clear: 939
13_Long_Serve: 987
14_Smash: 284
15_Flat_Shot: 267
"""
CATEGORIES = ['00_Short_Serve','02_Lift','04_Block',
              '05_Drop_Shot','06_Push_Shot','08_Cut',
              '12_Clear','13_Long_Serve','14_Smash',
              '15_Flat_Shot']


def load_data(base_dir):
    X, y = [], []
    
    for idx, cat in enumerate(CATEGORIES):
        cat_dir = os.path.join(base_dir, cat)
        
        num_samples = 0
        frames = []
        ids = []
        for fname in os.listdir(cat_dir):
            # i want the first 100 files in each category
            
            if not fname.endswith('.csv'):
                continue
            df = pd.read_csv(os.path.join(cat_dir, fname))
            if 'Frame' in df.columns:
                df = df.drop(columns=['Frame'])
            print(df)
            arr = df.values.tolist()
            print(arr)
            if len(arr) < TARGET_FRAMES:
                continue
            
            """if num_samples == TARGET_SAMPLES:
                break
            else: 
                num_samples += 1    
            """ 
                       
            """print(arr)
            X.append(arr)
            y.append(idx)"""
            frames.append(arr)
            ids.append(idx)
    
        # generate TARGET_SAMPLES random numbers from 0 to len(frames)-1
        random_indices = np.random.choice(len(frames), TARGET_SAMPLES, replace=False)

        for i in random_indices:
            X.append(frames[i])
            y.append(ids[i])

    X = np.stack(X, axis=0)
    
    y = keras.utils.to_categorical(y, num_classes=len(CATEGORIES))
    return X, y, CATEGORIES

In [None]:
import os
import ast
import numpy as np
import pandas as pd
from itertools import product
from sklearn.manifold import MDS
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Masking
from tensorflow.keras.optimizers import Adam

# ─────────────────────────────────────────────────
# 1) Load your labels and precomputed distance matrix
# ─────────────────────────────────────────────────
df = pd.read_csv('../synthetic_data/synthetic_dataset_30deg.csv')
labels = df['point_name'].values
n = len(labels)

# Suppose you’ve already computed these:
#   D_geo, D_sum, D_mean  (each is an n×n numpy array)
# For this example we’ll use D_geo:
# D_geo = np.load('D_geo.npy')

# Suppose `labels` is a list of all label strings,
# and `coords2`, `coords3` are dicts mapping each label -> (x2,y2) and -> (x3,y3).


# load the labels and the coordinates from file
df = pd.read_csv('../synthetic_data/synthetic_dataset_30deg.csv')
labels = df['point_name'].values
coords2 = {row['point_name']: (ast.literal_eval(row['point2'])) for _, row in df.iterrows()}
coords3 = {row['point_name']: (ast.literal_eval(row['point3'])) for _, row in df.iterrows()}

def euclid(a, b):
    return np.hypot(a[0]-b[0], a[1]-b[1])

# Preallocate distance matrices
n = len(labels)
D_geo, D_sum, D_mean = np.zeros((n,n)), np.zeros((n,n)), np.zeros((n,n))

for i, j in product(range(n), range(n)):
    lab_i, lab_j = labels[i], labels[j]
    d2 = euclid(coords2[lab_i], coords2[lab_j])
    d3 = euclid(coords3[lab_i], coords3[lab_j])

    D_geo[i,j]  = np.sqrt(d2 * d3)
    D_sum[i,j]  = d2 + d3
    D_mean[i,j] = 0.5 * (d2 + d3)


# ─────────────────────────────────────────────────
# 2) Turn distances into an embedding via MDS
# ─────────────────────────────────────────────────
mds = MDS(n_components=4, dissimilarity='precomputed', random_state=42)
embeddings = mds.fit_transform(D_mean)  
# embeddings.shape == (n, 4)

# Create a lookup from code → vector
code2emb = {lab: embeddings[i] for i, lab in enumerate(labels)}

# ─────────────────────────────────────────────────
# 3) Build your sequence dataset
# ─────────────────────────────────────────────────
DATA_DIR = "../VB_DATA/extracted_features_positional_green_20_normalized"
CATEGORIES = ['00_Short_Serve','02_Lift',
              '05_Drop_Shot','08_Cut',
              '12_Clear','13_Long_Serve','14_Smash',
              '15_Flat_Shot']

all_seqs, all_labels = [], []
for folder in CATEGORIES:
    folder_path = f"{DATA_DIR}/{folder}"
    label = folder.split('_',1)[1]
    for fname in sorted(os.listdir(folder_path)):
        if not fname.endswith('.csv'): continue
        df = pd.read_csv(f"{folder_path}/{fname}")
        if 'Frame' in df: df = df.drop(columns=['Frame'])
        # df.values is shape (frames, points)
        seq_codes = df.values.astype(str)
        # map each code to its embedding vector
        # result shape: (frames, points, emb_dim)
        seq_vecs = np.stack([
            np.stack([code2emb[c] for c in row], axis=0)
            for row in seq_codes
        ], axis=0)
        # flatten points into features: (frames, points*emb_dim)
        T, P, E = seq_vecs.shape
        seq_flat = seq_vecs.reshape(T, P*E)
        all_seqs.append(seq_flat)
        all_labels.append(label)

# pad or truncate to fixed length T=20
from tensorflow.keras.preprocessing.sequence import pad_sequences
X = pad_sequences(all_seqs, maxlen=20, dtype='float32', padding='post', truncating='post')
# encode labels
from sklearn.preprocessing import LabelEncoder
le = LabelEncoder()
y = le.fit_transform(all_labels)
y = np.eye(len(le.classes_))[y]   # one-hot


In [None]:
import kerastuner as kt
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Masking, LSTM, Dense, Dropout
from tensorflow.keras.optimizers import Adam

# ─────────────────────────────────────────────────
# 1) (Re)load & preprocess your data
# ─────────────────────────────────────────────────
# — load your sequences and labels as before —
# For brevity, here we assume X_all (N, T, F) and y_all (one-hot, N, C) are ready.
# If not, build them exactly as in the MDS+LSTM example.

# Split into train/test
X_train, X_val, y_train, y_val = train_test_split(
    X, y, test_size=0.2, stratify=y, random_state=42
)

# ─────────────────────────────────────────────────
# 2) Define a model-building function for Keras Tuner
# ─────────────────────────────────────────────────
def build_model(hp):
    model = Sequential()
    
    # Optional masking if you padded with zeros
    model.add(Masking(mask_value=0.0, input_shape=X_train.shape[1:]))
    
    # Tune number of LSTM layers (1–2) and their units (32–256)
    for i in range(hp.Int('num_lstm_layers', 1, 2)):
        return_seq = (i < hp.get('num_lstm_layers') - 1)
        units = hp.Int(f'lstm_units_{i}', min_value=32, max_value=256, step=32)
        model.add(LSTM(units, return_sequences=return_seq))
        # Optionally add dropout after each LSTM
        model.add(Dropout(hp.Float(f'dropout_{i}', 0.0, 0.5, step=0.1)))
    
    # Final Dense for classification
    model.add(Dense(y_train.shape[1], activation='softmax'))
    
    # Tune the learning rate for Adam
    lr = hp.Choice('learning_rate', [1e-2, 1e-3, 1e-4])
    # Add batch_size as a hyperparameter
    hp.Choice('batch_size', [16, 32, 64])
    model.compile(
        optimizer=Adam(learning_rate=lr),
        loss='categorical_crossentropy',
        metrics=['accuracy']
    )
    return model

# ─────────────────────────────────────────────────
# 3) Instantiate the tuner
# ─────────────────────────────────────────────────
tuner = kt.Hyperband(
    build_model,
    objective='val_accuracy',
    max_epochs=30,
    factor=3,
    directory='kt_dir',
    project_name='lstm_sequence_tuning'
)

# ─────────────────────────────────────────────────
# 4) Add a callback to stop early when no improvement
# ─────────────────────────────────────────────────
stop_early = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=5)

# ─────────────────────────────────────────────────
# 5) Run the hyperparameter search
tuner.search(
    X_train, y_train,
    epochs=50,
    validation_data=(X_val, y_val),
    callbacks=[stop_early],
    batch_size=kt.HyperParameters().Choice('batch_size', [16, 32, 64])
)

# ─────────────────────────────────────────────────
# 6) Retrieve the best model and hyperparameters
# ─────────────────────────────────────────────────
best_hps = tuner.get_best_hyperparameters(num_trials=1)[0]
print("Best hyperparameters:")
print(f"  # LSTM layers: {best_hps.get('num_lstm_layers')}")


# Build the best model and train it fully
model = tuner.hypermodel.build(best_hps)
history = model.fit(
    X_train, y_train,
    epochs=50,
    validation_data=(X_val, y_val),
    batch_size=best_hps.get('batch_size'),
    callbacks=[stop_early]
)

# ─────────────────────────────────────────────────
# 7) Evaluate on hold-out test set (if you held one out)
# ─────────────────────────────────────────────────
loss, acc = model.evaluate(X_test, y_test)
print(f"Test accuracy: {acc:.4f}")

In [None]:
# i want the confusion matrix for this analysis
from sklearn.metrics import confusion_matrix
y_pred = model.predict(X_test)
y_pred_classes = np.argmax(y_pred, axis=1)
y_true_classes = np.argmax(y_test, axis=1)
cm = confusion_matrix(y_true_classes, y_pred_classes, labels=np.arange(len(le.classes_)))
cm_df = pd.DataFrame(cm, index=le.classes_, columns=le.classes_)
print("\nConfusion Matrix:")
print(cm_df)

# also print accuracy by class and overall accuracy
from sklearn.metrics import accuracy_score
class_accuracies = cm.diagonal() / cm.sum(axis=1)
overall_accuracy = accuracy_score(y_true_classes, y_pred_classes)
print("\nClass Accuracies:")
for cls, acc in zip(le.classes_, class_accuracies):
    print(f"{cls}: {acc:.4f}")
print(f"\nOverall Accuracy: {overall_accuracy:.4f}")        

# can I also get the accuracy for the top 3 predictions
from sklearn.metrics import top_k_accuracy_score
top_k_acc = top_k_accuracy_score(y_true_classes, y_pred, k=3)
print(f"\nTop-3 Accuracy: {top_k_acc:.4f}")



In [None]:
%pip install shap

import shap
import numpy as np
import tensorflow as tf

# assume `model` is your trained keras Sequential model
# and X_sample is an array of shape (M, T, F), e.g. a small background set
# and X_test is your test set of shape (N, T, F)

# 1.1 Select a small background set (e.g. 100 examples) for KernelExplainer
background = X_train[np.random.choice(len(X_train), 100, replace=False)]

# 1.2 Create a wrapper that takes 2D inputs for KernelExplainer
def model_predict(x_flat):
    # x_flat comes in shape (K, T*F)
    x = x_flat.reshape(-1, X_train.shape[1], X_train.shape[2])
    return model.predict(x)

# flatten background
bg_flat = background.reshape(-1, X_train.shape[1]*X_train.shape[2])

# 1.3 Build a KernelExplainer
explainer = shap.KernelExplainer(model_predict, bg_flat)

# 1.4 Pick one sequence to explain (flattened)
idx = 0
x_to_explain = X_test[idx:idx+1]
x_flat = x_to_explain.reshape(1, -1)

# 1.5 Compute SHAP values
shap_values = explainer.shap_values(x_flat, nsamples=200)

# shap_values is a numpy array of shape (1, T, F) for a single sample
# so just use shap_values[0] for the first (and only) sample
sv = shap_values[0]  # shape (T, F)

# 1.6 Sum over time to get overall feature importances:
feature_importance = np.sum(sv, axis=0)  # shape (F,)
# or sum over features to see which time‐steps were most important:
time_importance    = np.sum(sv, axis=1)  # shape (T,)


In [None]:
import numpy as np
import matplotlib.pyplot as plt
import shap

# —————————————————————————————————————————————
# 1) Compute SHAP values for all test samples
# —————————————————————————————————————————————
# Flatten test set
N, T, F = X_test.shape
X_flat = X_test.reshape(N, T*F)

# Explainer (built once on background)
explainer = shap.KernelExplainer(model_predict, bg_flat)

# Compute shap_values for each class: returns list of arrays, each (N, T*F)
shap_values = explainer.shap_values(X_flat, nsamples=200)

# —————————————————————————————————————————————
# 2) Average SHAP values across all test samples
# —————————————————————————————————————————————
# shap_values shape: (N, T, F)
mean_shap = shap_values.mean(axis=0)  # shape (T, F)

# —————————————————————————————————————————————
# 3) Plot a single heatmap for mean SHAP values
# —————————————————————————————————————————————
feature_names = [f"feat_{i}" for i in range(F)]
time_steps    = np.arange(T)

plt.figure(figsize=(8, 4))
plt.imshow(mean_shap, aspect='auto', cmap='seismic', extent=[0, F, T, 0])
plt.colorbar(label="SHAP value")
plt.xlabel("Feature index")
plt.ylabel("Time step")
plt.title("Avg SHAP contributions over time (all classes)")
plt.yticks(time_steps)
plt.xticks(np.arange(F), feature_names, rotation=90)
plt.tight_layout()
plt.show()


In [39]:
plt.show()
# the plt is not showing, just prin it
print("Mean SHAP values shape:", mean_shap.shape)
print(mean_shap)

#save mean_shap to a csv file
# Ensure the number of columns matches the shape of mean_shap
if mean_shap.shape[1] == len(feature_names):
	mean_shap_df = pd.DataFrame(mean_shap, columns=feature_names)
else:
	mean_shap_df = pd.DataFrame(mean_shap, columns=[f"feat_{i}" for i in range(mean_shap.shape[1])])
mean_shap_df.to_csv('mean_shap_values.csv', index=False)


Mean SHAP values shape: (480, 8)
[[-3.48753420e-04 -3.70426531e-05  2.67150204e-04 ... -1.98318436e-03
  -1.30213883e-03  2.70476509e-05]
 [ 1.09499303e-04  2.02984236e-04  6.04509663e-05 ...  1.35791628e-04
   2.75835503e-04 -3.17929663e-05]
 [-1.45400241e-04  2.83693683e-05  4.26753882e-04 ... -2.87495097e-04
   9.08930951e-05 -5.68354808e-05]
 ...
 [ 8.82081558e-05  1.82199871e-04  3.81624725e-04 ... -2.78686680e-05
   2.28735362e-04  1.39736037e-05]
 [-1.82134874e-05 -5.49932982e-06  8.22522330e-05 ... -1.73052548e-04
  -1.31608406e-04 -5.34780519e-05]
 [-1.85229003e-04  8.00747688e-05  4.57848360e-04 ... -2.52525410e-04
  -2.22214726e-04  1.02304938e-04]]


In [None]:
import numpy as np
import matplotlib.pyplot as plt
import shap

# —————————————————————————————————————————————
# 1) Flatten and explain all your test set
# —————————————————————————————————————————————
N, T, F = X_test.shape
X_flat = X_test.reshape(N, T*F)

# Use your existing KernelExplainer (built on bg_flat)
# and your model_predict wrapper
shap_values = explainer.shap_values(X_flat, nsamples=200)  
# shap_values is a list of length C, each array (N, T*F)

# —————————————————————————————————————————————
# 2) Compute per‐time contributions per class
# —————————————————————————————————————————————
num_classes = len(shap_values)
time_contrib = np.zeros((num_classes, X_test.shape[1]))

for c in range(num_classes):
    N = shap_values[c].shape[0]
    TF = shap_values[c].shape[1]
    T = X_test.shape[1]
    if TF % T != 0 or T == 0:
        print(f"Warning: For class {c}, cannot reshape ({N},{TF}) to ({N},{T},F). Skipping.")
        continue
    F = TF // T
    if F == 0:
        print(f"Warning: Computed F=0 for class {c}, skipping.")
        continue
    sv_c = shap_values[c].reshape(N, T, F)
    # sum over features → (N, T)
    time_per_sample = sv_c.sum(axis=2)
    # average across all samples in class c (you could also filter by true label)
    time_contrib[c] = time_per_sample.mean(axis=0)

# —————————————————————————————————————————————
# 3) Plot temporal profiles
# —————————————————————————————————————————————
for c in range(num_classes):
    plt.figure(figsize=(6,2.5))
    plt.plot(range(T), time_contrib[c], marker='o')
    plt.title(f"Mean SHAP Time-Step Contribution — Class {c}")
    plt.xlabel("Time Step")
    plt.ylabel("Sum of SHAP Values")
    plt.grid(True)
    plt.tight_layout()
    plt.show()
