# Basketabll Activity Recognition using Pytorch LSTM Neural Network

# Reading the dataset

In [None]:
import pandas as pd
import numpy as np
import glob
import os

from torch.utils.data import DataLoader

from sklearn.metrics import precision_score, recall_score, f1_score, jaccard_score,confusion_matrix, plot_confusion_matrix
import time

import torch
import torch.nn as nn


from sklearn.metrics import f1_score, confusion_matrix, plot_confusion_matrix
files = glob.glob('/data/shk/dl-for-har/tutorial_notebooks/Prof_New/*.csv')

name = [file.split('/')[-1] for file in files]
df = pd.concat(map(pd.read_csv,files),ignore_index = True) #4230213 rows × 10 columns

data = df

#Considering only basketball
clean_data = data[data['basketball']!='not_labeled']
clean_data = clean_data[data['basketball'].notna()]
clean_data['basketball'].replace({'jumping':'layup'},inplace=True) #Replacing jumping by layup

new_data = clean_data[clean_data.coarse != 'game'] #623758 rows × 10 columns
new_data = new_data.drop(columns='coarse')
new_data = new_data.iloc[:,:6]
print('new_data',new_data.shape)
print('____')

# Sepearting Game data from train data
game_data = clean_data[clean_data.coarse == 'game']
game_data = game_data.drop(columns='coarse')
game_data = game_data.iloc[:,:6] #45446 rows × 6 columns
print('game_data',game_data.shape)

# Labelling the Data

In [None]:
label = new_data['basketball']
x_axis = sorted(label.unique())
y_axis = label.value_counts()

X = new_data.iloc[:,:-1]#.astype(np.float32)

class_names = ['dribbling','shot','pass','layup','rebound']
num_classes = len(class_names)

def labelling(clean_data,data_y):
    clean_data[data_y == 'dribbling'] = 0
    clean_data[data_y == 'shot'] = 1
    clean_data[data_y == 'pass'] = 2
    clean_data[data_y == 'layup'] = 3
    clean_data[data_y == 'rebound'] = 4
    
    return data_y

y = labelling(new_data,new_data.iloc[:,-1]).astype(int)

#subject = X['subject']
#uni_subject = subject.unique()
#uni_subject_count = subject.value_counts()

data_labelled = np.concatenate((X, y[:,None]), axis=1)[:,1:] #669204, 6


# Special preprocessing

In [None]:
data_bribbling = data_labelled[data_labelled[:,-1] == 0]
data_shot = data_labelled[data_labelled[:,-1] == 1]
data_pass = data_labelled[data_labelled[:,-1] == 2]
data_layup = data_labelled[data_labelled[:,-1] == 3]
data_rebound = data_labelled[data_labelled[:,-1] == 4]


def spilt(data):

    dd = np.asarray(data)
    T = int(0.8* len(dd))
    train_size = int(T)
    test_size = len(dd) - train_size

    train_df,test_df = torch.utils.data.random_split(dd, [train_size,test_size])
    X = train_df[:][:,0:4]
    Y = train_df[:][:,4]

    X_v = test_df[:][:,0:4]
    y_v = test_df[:][:,4]
  
    return X,Y,X_v,y_v

X_dribble,Y_dribble,X_v_dribble,y_v_dribble = spilt(data_bribbling)
X_shot,Y_shot,X_v_shot,y_v_shot = spilt(data_shot)
X_pass,Y_pass,X_v_pass,y_v_pass = spilt(data_pass)
X_layup,Y_layup,X_v_layup,y_v_layup = spilt(data_layup)
X_rebound,Y_rebound,X_v_rebound,y_v_rebound = spilt(data_rebound)

X = np.concatenate((X_dribble, X_shot,X_pass,X_layup,X_rebound), axis=0)[:,1:]
Y = np.concatenate((Y_dribble, Y_shot,Y_pass,Y_layup,Y_rebound), axis=0)

X_v = np.concatenate((X_v_dribble, X_v_shot,X_v_pass,X_v_layup,X_v_rebound), axis=0)[:,1:]
y_v = np.concatenate((y_v_dribble, y_v_shot,y_v_pass,y_v_layup,y_v_rebound), axis=0)

train_data = np.column_stack((X,Y))
valid_data = np.column_stack((X_v,y_v))

# Sliding Window

In [None]:
def sliding_window(data, samples_per_window, overlap_ratio):
    windows = []
    indices = []
    curr = 0
    win_len = int(samples_per_window)
    
    if overlap_ratio is not None:
        overlapping_elements = int((overlap_ratio / 100) * (win_len))
        if overlapping_elements >= win_len:
            print('Number of overlapping elements exceeds window size.')
            return
    while curr < len(data) - win_len:
        windows.append(data[curr:curr + win_len])
        indices.append([curr, curr + win_len])
        curr = curr + win_len - overlapping_elements
    try:
        result_windows = np.array(windows)
        result_indices = np.array(indices)
    except:
        result_windows = np.empty(shape=(len(windows), win_len, data.shape[1]), dtype=object)
        result_indices = np.array(indices)
        for i in range(0, len(windows)):
            result_windows[i] = windows[i]
            result_indices[i] = indices[i]
    return result_windows, result_indices

def apply_sliding_window(data_x, data_y, sliding_window_size, sampling_rate, sliding_window_overlap):
    
    output_x, _ = sliding_window(data_x, sliding_window_size, sliding_window_overlap)
    output_y, _ = sliding_window(data_y, sliding_window_size, sliding_window_overlap)

    return output_x,output_y

sw_length = 50
sw_overlap = 25

X_train, y_train = apply_sliding_window(X, Y, sw_length, sampling_rate=50,sliding_window_overlap=sw_overlap)
X_valid, y_valid = apply_sliding_window(X_v, y_v, sw_length, sampling_rate=50,sliding_window_overlap=sw_overlap)


X_train, y_train = X_train.astype(np.float32), y_train.astype(np.uint8)
X_valid, y_valid = X_valid.astype(np.float32), y_valid.astype(np.uint8)

print("\nShape of the X_train and Y_train datasets after windowing: ")
print(X_train.shape, y_train.shape) 

print("\nShape of the X_valid and y_valid datasets after windowing: ")
print(X_valid.shape, y_valid.shape)

y_train = y_train[:, 0]
y_valid = y_valid[:, 0]

data_X = np.concatenate((X_train,X_valid),axis=0) 

print("\nShape of the X after windowing: ")
print(data_X.shape)

print("\nShape of the Y after windowing: ")
data_Y = np.concatenate((y_train,y_valid),axis=0) 
print(data_Y.shape)

# Network

In [None]:
class Network(nn.Module):

    def __init__(self, input_dim , hidden_size , num_layers):
        super(Network, self).__init__()
        self.num_layers = num_layers
        self.input_size = input_dim
        self.hidden_size = hidden_size
        self.relu = nn.ReLU(inplace=True)
        
        self.lstm = nn.LSTM(input_size= input_dim , hidden_size = hidden_size , num_layers= num_layers ,
                            batch_first=True, dropout = 0.1,bidirectional = True)

        self.lstm1 = nn.LSTM(input_size=hidden_size*2, hidden_size=hidden_size, num_layers=num_layers,
                            batch_first=True, dropout=0.1, bidirectional=True)
        self.dropout1 = nn.Dropout(p=0.1)       
        self.linear1 = nn.Linear(2*1300,250)
        self.linear2 = nn.Linear(250,500)
        self.linear3 = nn.Linear(500,130)
        self.dropout2 = nn.Dropout(p=0.2)
        self.dropout3 = nn.Dropout(p=0.3)
        self.linear5 = nn.Linear(130,5)

    def forward(self, x):  
        x,_ = self.lstm(x)
        x, _ = self.lstm1(x)
        x = self.relu(x)
        x = x.reshape(-1,x.shape[1]*x.shape[2])     
        x = self.relu(self.linear1(x))
        x = self.relu(self.linear2(x))
        x = self.dropout3(x)
        x = self.relu(self.linear3(x))
        x = self.dropout3(x)
        x = self.linear5(x)
        return x
    
    
inp_dim = 3
hd = 26
nl = 3

torch_model = Network(inp_dim,hd,nl)
torch_model.train()

# Data loading

In [None]:
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(torch_model.parameters(), lr=1e-5, weight_decay=1e-4)

train_dataset = torch.utils.data.TensorDataset(torch.from_numpy(X_train).float(), torch.from_numpy(y_train))       
trainloader = torch.utils.data.DataLoader(train_dataset, shuffle=True, batch_size=12)

test_dataset = torch.utils.data.TensorDataset(torch.from_numpy(X_valid).float(), torch.from_numpy(y_valid))
valloader = torch.utils.data.DataLoader(test_dataset, shuffle=True, batch_size=12)

# Training

In [None]:
num_epochs = 250
check=0
for e in range(num_epochs):
    
    best_accuracy=0.0
    train_accuracy=0.0
    train_loss=0.0

    train_losses = []
    train_preds = []
    train_gt = []
    start_time = time.time()
    batch_num = 1
    k=0 
    
    for x, y in trainloader:

        optimizer.zero_grad()

        train_output = torch_model(x)
        loss = criterion(train_output, y.long())

        loss.backward()
        optimizer.step()

        train_losses.append(loss.item())

        y_preds = np.argmax(train_output.cpu().detach().numpy(), axis=-1)

        y_true = y.cpu().numpy()

        train_preds = np.concatenate((np.array(train_preds, int), np.array(y_preds, int)))
        train_gt = np.concatenate((np.array(train_gt, int), np.array(y_true, int)))

        if batch_num % 6000 == 0 and batch_num > 0:
            cur_loss = np.mean(train_losses)
            elapsed = time.time() - start_time
            print('| epoch {:3d} | {:5d} batches | ms/batch {:5.2f} | train loss {:5.2f}'.format(e, batch_num,
                                                                                                         elapsed * 1000 / 16, cur_loss))
            start_time = time.time()

        batch_num += 1


    acc = jaccard_score(train_gt, train_preds, average='macro')
    pre = precision_score(train_gt, train_preds, average='macro', zero_division=0)
    reca = recall_score(train_gt, train_preds, average='macro', zero_division=0)
    f1 = f1_score(train_gt, train_preds, average='macro', zero_division=0)

    print("___Training____")
    print(f"Epoch {e},accuracy {round(acc,3)*100} ,precision {round(pre,3)*100}, recall {round(reca,3)*100}, f1_score {round(f1,3)*100} ")

    val_preds = []
    val_gt = []
    val_losses = []

    # sets network to eval mode and
    torch_model.eval()
    test_accuracy=0.0
    with torch.no_grad():
        for x, y in valloader:
            # send inputs through network to get predictions
            val_output = torch_model(x)

            val_loss = criterion(val_output, y.long())

            val_losses.append(val_loss.item())

            y_preds = np.argmax(val_output.cpu().numpy(), axis=-1)
            y_true = y.cpu().numpy()

            val_preds = np.concatenate((np.array(val_preds, int), np.array(y_preds, int)))
            val_gt = np.concatenate((np.array(val_gt, int), np.array(y_true, int)))

        acc_test = jaccard_score(val_gt, val_preds, average='macro')
        pre_test = precision_score(val_gt, val_preds, average='macro', zero_division=0)
        reca_test = recall_score(val_gt, val_preds, average='macro', zero_division=0)
        f1_test = f1_score(val_gt, val_preds, average='macro', zero_division=0)

        print("___Testing____")
        print(f"accuracy {round(acc_test,3)*100} ,precision {round(pre_test,3)*100}, recall {round(reca_test,3)*100}, f1_score {round(f1_test,3)*100} ")

    torch.save(torch_model.state_dict(), os.path.join( './Pre_Trained_models/epoch-{}.pt'.format(e)))


# Predictions

In [None]:
cls = np.array(range(5))
class_names = ['dribbling','shot','pass','layup','rebound']

print('\nTraining RESULTS: ')
print("\nAvg. Accuracy: {0}".format(jaccard_score(train_gt, train_preds, average='macro')))
print("Avg. Precision: {0}".format(precision_score(train_gt, train_preds, average='macro')))
print("Avg. Recall: {0}".format(recall_score(train_gt, train_preds, average='macro')))
print("Avg. F1: {0}".format(f1_score(train_gt, train_preds, average='macro')))

print("\nTraining RESULTS (PER CLASS): ")
print("\nAccuracy:")
for i, rslt in enumerate(jaccard_score(train_gt, train_preds, average=None, labels=cls)):
    print("   {0}: {1} %".format(class_names[i], rslt*100))
print("\nPrecision:")
for i, rslt in enumerate(precision_score(train_gt, train_preds, average=None, labels=cls)):
    print("   {0}: {1} %".format(class_names[i], rslt*100))
print("\nRecall:")
for i, rslt in enumerate(recall_score(train_gt, train_preds, average=None, labels=cls)):
    print("   {0}: {1} %".format(class_names[i], rslt*100))
print("\nF1:")
for i, rslt in enumerate(f1_score(train_gt, train_preds, average=None, labels=cls)):
    print("   {0}: {1} %".format(class_names[i], rslt*100))

# Validation Accuracies

In [None]:
print('\nValidation RESULTS: ')
print("\nAvg. Accuracy: {0}".format(jaccard_score(test_gt, test_preds, average='macro')))
print("Avg. Precision: {0}".format(precision_score(test_gt, test_preds, average='macro')))
print("Avg. Recall: {0}".format(recall_score(test_gt, test_preds, average='macro')))
print("Avg. F1: {0}".format(f1_score(test_gt, test_preds, average='macro')))

print("\nVALIDATION RESULTS (PER CLASS): ")
print("\nAccuracy:")
for i, rslt in enumerate(jaccard_score(test_gt, test_preds, average=None, labels=cls)):
    print(f'{class_names[i]}: {round(rslt,2)*100}')

print("\nPrecision:")
for i, rslt in enumerate(precision_score(test_gt, test_preds, average=None, labels=cls)):
    print(f'{class_names[i]}: {round(rslt,2)*100}')

print("\nRecall:")
for i, rslt in enumerate(recall_score(test_gt, test_preds, average=None, labels=cls)):
    print(f'{class_names[i]}: {round(rslt,2)*100}')

print("\nF1:")
for i, rslt in enumerate(f1_score(test_gt, test_preds, average=None, labels=cls)):
    print(f'{class_names[i]}: {round(rslt,2)*100}')

    
print("\nGENERALIZATION GAP ANALYSIS: ")
print("\nTrain-Val-Accuracy Difference: {0}".format(jaccard_score(train_gt, train_preds, average='macro') -
                                                  jaccard_score(test_gt, test_preds, average='macro')))
print("Train-Val-Precision Difference: {0}".format(precision_score(train_gt, train_preds, average='macro') -
                                                   precision_score(test_gt, test_preds, average='macro')))
print("Train-Val-Recall Difference: {0}".format(recall_score(train_gt, train_preds, average='macro') -
                                                recall_score(test_gt, test_preds, average='macro')))
print("Train-Val-F1 Difference: {0}".format(f1_score(train_gt, train_preds, average='macro') -
                                            f1_score(test_gt, test_preds, average='macro')))

# Testing on Game data

In [None]:
print('game_data',game_data.shape)

label_game = game_data['basketball']
x_axis_game = sorted(label_game.unique())
y_axis_game = label_game.value_counts()

X_game = game_data.iloc[:,:-1]

def labelling(clean_data,data_y):
    clean_data[data_y == 'dribbling'] = 0
    clean_data[data_y == 'shot'] = 1
    clean_data[data_y == 'pass'] = 2
    clean_data[data_y == 'layup'] = 3
    clean_data[data_y == 'rebound'] = 4
    
    return data_y

y_game = labelling(game_data,game_data.iloc[:,-1]).astype(int)

data_labelled_game = np.concatenate((X_game, y_game[:,None]), axis=1)[:,2:] #669204, 6
print('data_labelled_game ',data_labelled_game.shape)

data_labelled_X = data_labelled_game[:,:-1]
data_labelled_Y = data_labelled_game[:,-1]

print(data_labelled_X.shape,data_labelled_Y.shape)
sw_length = 50
sw_overlap = 25

X_train_game, y_train_game = apply_sliding_window(data_labelled_X, data_labelled_Y, sliding_window_size=sw_length, 
                                        sampling_rate=50,sliding_window_overlap=sw_overlap)


X_train_game, y_train_game = X_train_game.astype(np.float32), y_train_game[:,0].astype(np.uint8)

print("\nShape of the X_train_game and y_train_game datasets after windowing: ")
print(X_train_game.shape, y_train_game.shape)

# Load the pretrained Model

In [None]:
checkpoint = torch.load('./Pre_Trained_models/epoch-249.pt')
#model = ConvNet(num_classes=8)
torch_model.load_state_dict(checkpoint)
torch_model.eval()

# Testing on game

In [None]:
print('game_data',game_data.shape)

label_game = game_data['basketball']
x_axis_game = sorted(label_game.unique())
y_axis_game = label_game.value_counts()

X_game = game_data.iloc[:,:-1]

def labelling(clean_data,data_y):
    clean_data[data_y == 'dribbling'] = 0
    clean_data[data_y == 'shot'] = 1
    clean_data[data_y == 'pass'] = 2
    clean_data[data_y == 'layup'] = 3
    clean_data[data_y == 'rebound'] = 4
    
    return data_y

y_game = labelling(game_data,game_data.iloc[:,-1]).astype(int)

data_labelled_game = np.concatenate((X_game, y_game[:,None]), axis=1)[:,2:] #669204, 6
print('data_labelled_game ',data_labelled_game.shape)

data_labelled_X = data_labelled_game[:,:-1]
data_labelled_Y = data_labelled_game[:,-1]

print(data_labelled_X.shape,data_labelled_Y.shape)
sw_length = 50
sw_overlap = 25

X_train_game, y_train_game = apply_sliding_window(data_labelled_X, data_labelled_Y, sliding_window_size=sw_length, 
                                        sampling_rate=50,sliding_window_overlap=sw_overlap)


X_train_game, y_train_game = X_train_game.astype(np.float32), y_train_game[:,0].astype(np.uint8)

print("\nShape of the X_train_game and y_train_game datasets after windowing: ")
print(X_train_game.shape, y_train_game.shape)

In [None]:
X_train_game, y_train_game_ = torch.from_numpy(X_train_game).float(), torch.from_numpy(y_train_game)

In [None]:
output_game = torch_model(X_train_game)
loss = criterion(output_game, y_train_game_.long())

In [None]:
g_train_preds =[]
g_train_gt = []
y_pred_game = np.argmax(output_game.cpu().detach().numpy(), axis=-1)

y_game = y_train_game_.cpu().numpy()

train_preds = np.concatenate((np.array(g_train_preds, int), np.array(y_pred_game, int)))
train_gt = np.concatenate((np.array(g_train_gt, int), np.array(y_game, int)))

In [None]:
acc_game = jaccard_score(y_game, y_pred_game, average='macro')
pre_game = precision_score(y_game, y_pred_game, average='macro', zero_division=0)
reca_game = recall_score(y_game, y_pred_game, average='macro', zero_division=0)
f1_game = f1_score(y_game, y_pred_game, average='macro', zero_division=0)

cls = np.array(range(5))
class_names = ['dribbling','shot','pass','layup','rebound']

print("___GAME____")
print('Game_loss',loss)
print(f"accuracy {round(acc_game,3)*100} ,precision {round(pre_game,3)*100}, recall {round(reca_game,3)*100},f1_score {round(f1_game,3)*100} ")

In [None]:
print('\nGame RESULTS: ')
print("\nAvg. Accuracy: {0}".format(jaccard_score(y_game, y_pred_game, average='macro')))
print("Avg. Precision: {0}".format(precision_score(y_game, y_pred_game, average='macro')))
print("Avg. Recall: {0}".format(recall_score(y_game, y_pred_game, average='macro')))
print("Avg. F1: {0}".format(f1_score(y_game, y_pred_game, average='macro')))

print("\nGAME RESULTS (PER CLASS): ")
print("\nAccuracy:")
for i, rslt in enumerate(jaccard_score(y_game, y_pred_game, average=None, labels=cls)):
    print(f'{class_names[i]}: {round(rslt,2)*100}')
    #print("   {0}: {1} %".format(class_names[i], rslt*100))
print("\nPrecision:")
for i, rslt in enumerate(precision_score(y_game, y_pred_game, average=None, labels=cls)):
    print(f'{class_names[i]}: {round(rslt,2)*100}')
    #print("   {0}: {1} %".format(class_names[i], rslt*100))
print("\nRecall:")
for i, rslt in enumerate(recall_score(y_game, y_pred_game, average=None, labels=cls)):
    print(f'{class_names[i]}: {round(rslt,2)*100}')
    #print("   {0}: {1} %".format(class_names[i], rslt*100))
print("\nF1:")
for i, rslt in enumerate(f1_score(y_game, y_pred_game, average=None, labels=cls)):
    print(f'{class_names[i]}: {round(rslt,2)*100}')
    #print("   {0}: {1} %".format(class_names[i], rslt*100))