# Define model and import dependencies

In [1]:
import torch
from torch import optim, nn, utils, Tensor
import torch.nn.functional as F
from torch.utils.data import DataLoader
from torch.utils.data import Dataset
import pandas as pd
import numpy as np

In [2]:
# Defining our model
class LSTMModel(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes):
        super(LSTMModel, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, num_classes)

    def forward(self, x):
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(device=x.device)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(device=x.device)

        out, _ = self.lstm(x, (h0, c0))

        out = self.fc(out[:, -1, :])

        return out

In [3]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [6]:
# Check if GPU is available. But should work fine on CPU too.
if torch.cuda.is_available():
    device = torch.device('cuda')
else:
    device = torch.device('cpu')

print('device:', device)

device: cuda


## If you're training the model:

In [None]:
# train and validation data
!unzip -q /content/drive/MyDrive/ybigta/2023-2/win_prediction_model/initial.zip -d /content/initial

# Open data_label, contains keypoints numpy array names and labels
data_label = pd.read_csv("/content/initial/data_label.csv", index_col = 0)

In [None]:
# custom defined dataset for our data format
class KeypointsDataset(Dataset):
    def __init__(self, array_paths, labels, num_input):
        """
        Generates a custom numpy dataset

        Args:
            array_paths (list): list of numpy array inputs
            labels (list): list of labels
            num_input (int) : number of inputs to enter
        """
        self.array_paths = array_paths
        self.labels = labels
        self.num_input = num_input

    def __len__(self):
        """
        Returns length of entire dataset
        """
        return len(self.array_paths)

    def __getitem__(self, idx, ):
        """
        Gets the sample that corresponds to the sample id (idx)

        Args:
            idx (int): sample index

        Returns:
            keypoints (torch.Tensor): keypoints input tensor
            label (torch.Tensor): winning label tensor
        """
        keypoints_path = "/content/initial/data/"
        # load and turn numpy arrays into torch tensors
        keypoints = np.load(f"{keypoints_path}{self.array_paths[idx]}.npy")
        keypoints = keypoints[:,-self.num_input:]
        keypoints = torch.tensor(keypoints, dtype = torch.float32)
        label = torch.tensor(self.labels[idx])

        return keypoints, label

In [None]:
from sklearn.model_selection import train_test_split
train_array, val_array = train_test_split(data_label, test_size = 0.1, random_state = 15)

In [None]:
# Initialise the dataset and dataloader
num_in = 100
train_data = KeypointsDataset(
    array_paths = train_array['modified_json_filename'].reset_index(drop = True),
    labels = train_array['label'].reset_index(drop=True),
    num_input = num_in
    )
val_data = KeypointsDataset(
    array_paths = val_array['modified_json_filename'].reset_index(drop= True),
    labels = val_array['label'].reset_index(drop= True),
    num_input = num_in
    )
train_loader = DataLoader(train_data)
val_loader = DataLoader(val_data)

In [None]:
def train_loop(dataloader, model, loss_fn, optimizer):
    size = len(dataloader.dataset)
    # Set the model to training mode - important for batch normalization and dropout layers
    # Unnecessary in this situation but added for best practices
    model.train()
    for i, (inputs, labels) in enumerate(dataloader):
        inputs = inputs.to(device)
        labels = labels.to(device)

        optimizer.zero_grad()

        outputs = model(inputs)

        loss = loss_fn(outputs, labels)
        loss.backward()

        optimizer.step()

        if i %  51 == 0:
            loss, current = loss.item(), (i + 1) * len(inputs)
            print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")


def test_loop(dataloader, model, loss_fn):
    # Set the model to evaluation mode - important for batch normalization and dropout layers
    # Unnecessary in this situation but added for best practices
    model.eval()
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    test_loss, correct = 0, 0

    # Evaluating the model with torch.no_grad() ensures that no gradients are computed during test mode
    # also serves to reduce unnecessary gradient computations and memory usage for tensors with requires_grad=True
    with torch.no_grad():
        for i, (inputs, labels) in enumerate(dataloader):
          inputs = inputs.to(device)
          labels = labels.to(device)

          outputs = model(inputs)

          _, predicted = torch.max(outputs.data, 1)
          test_loss += loss_fn(outputs, labels).item()
          correct += (outputs.argmax(1) == labels).type(torch.float).sum().item()
    test_loss /= num_batches
    correct /= size
    print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")

In [None]:
# Setting up the model and initiating the train & validation loops
model = LSTMModel(input_size=num_in, hidden_size=10, num_layers=1, num_classes=2).to(device)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.0001, weight_decay = 0.1)

num_epochs = 100
for t in range(num_epochs):
    print(f"Epoch {t+1}\n-------------------------------")
    train_loop(train_loader, model, criterion, optimizer)
    test_loop(val_loader, model, criterion)
print("Done!")

In [None]:
# If you're continuing to train from a pre-trained version:
pretrained_path = "/content/drive/MyDrive/ybigta/2023-2/win_prediction_model/winpred_model_sd"
model = LSTMModel(input_size=num_in, hidden_size=10, num_layers=1, num_classes=2).to(device)
model.load_state_dict(torch.load(pretrained_path, map_location= torch.device("cpu")), strict = False)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.0001, weight_decay = 0.1)

num_epochs = 5
for t in range(num_epochs):
    print(f"Epoch {t+1}\n-------------------------------")
    train_loop(train_loader, model, criterion, optimizer)
    test_loop(val_loader, model, criterion)
print("Done!")

## If you're doing a test:

### Test Dataset Version

In [9]:
!unzip -q /content/drive/MyDrive/ybigta/2023-2/win_prediction_model/test_keypoints.zip -d /content/test_keypoints

In [10]:
PATH_TO_NUMPY_ARRAY_FOLDER = "/content/test_keypoints/test_keypoints"
PATH_TO_NUMPY_ARRAY_FILENAMES_AND_LABELS = "/content/test_keypoints/test_data_label.csv"
PATH_TO_PRETRAINED_MODEL_STATE_DICT = "/content/drive/MyDrive/ybigta/2023-2/win_prediction_model/winpred_model_sd"

In [11]:
class KeypointsDataset(Dataset):
    def __init__(self, array_paths, labels, num_input):
        """
        Generates a custom numpy dataset

        Args:
            array_paths (list): list of numpy array inputs
            labels (list): list of labels
            num_input (int) : number of inputs to enter
        """
        self.array_paths = array_paths
        self.labels = labels
        self.num_input = num_input

    def __len__(self):
        """
        Returns length of entire dataset
        """
        return len(self.array_paths)

    def __getitem__(self, idx, ):
        """
        Gets the sample that corresponds to the sample id (idx)

        Args:
            idx (int): sample index

        Returns:
            keypoints (torch.Tensor): keypoints input tensor
            label (torch.Tensor): winning label tensor
        """
        keypoints_path = PATH_TO_NUMPY_ARRAY_FOLDER
        # Turn numpy arrays into torch tensors
        keypoints = np.load(f"{keypoints_path}{self.array_paths[idx]}.npy")
        keypoints = keypoints[:,-self.num_input:]
        keypoints = torch.tensor(keypoints, dtype = torch.float32)
        label = torch.tensor(self.labels[idx])

        return keypoints, label
def test_loop(dataloader, model, loss_fn):
    # Set the model to evaluation mode - important for batch normalization and dropout layers
    # Unnecessary in this situation but added for best practices
    model.eval()
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    test_loss, correct = 0, 0

    # Evaluating the model with torch.no_grad() ensures that no gradients are computed during test mode
    # also serves to reduce unnecessary gradient computations and memory usage for tensors with requires_grad=True
    with torch.no_grad():
        for i, (inputs, labels) in enumerate(dataloader):
          inputs = inputs.to(device)
          labels = labels.to(device)

          outputs = model(inputs)

          _, predicted = torch.max(outputs.data, 1)
          test_loss += loss_fn(outputs, labels).item()
          correct += (outputs.argmax(1) == labels).type(torch.float).sum().item()
    test_loss /= num_batches
    correct /= size
    print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")
test_array = pd.read_csv(PATH_TO_NUMPY_ARRAY_FILENAMES_AND_LABELS)
num_in = 100 # DO NOT ALTER
test_data = KeypointsDataset(
    array_paths = test_array['modified_json_filename'].reset_index(drop= True),
    labels = test_array['label'].reset_index(drop= True),
    num_input = num_in
    )
test_loader = DataLoader(test_data)
pretrained_path = PATH_TO_PRETRAINED_MODEL_STATE_DICT
model = LSTMModel(input_size=num_in, hidden_size=10, num_layers=1, num_classes=2).to(device)
model.load_state_dict(torch.load(pretrained_path, map_location= torch.device("cuda")), strict = False)
criterion = nn.CrossEntropyLoss()

In [12]:
# Run this after setting all the paths, and the above cell (loading the model)
test_loop(test_loader, model, criterion)

Test Error: 
 Accuracy: 37.5%, Avg loss: 0.753404 



### Individual File Version

In [15]:
!unzip -q /content/drive/MyDrive/ybigta/2023-2/win_prediction_model/test_keypoints.zip -d /content/test_keypoints

replace /content/test_keypoints/31_person.npy? [y]es, [n]o, [A]ll, [N]one, [r]ename: A


In [16]:
PATH_TO_NUMPY_ARRAY = "/content/test_keypoints/31_person.npy"
PATH_TO_PRETRAINED_MODEL_STATE_DICT = "/content/drive/MyDrive/ybigta/2023-2/win_prediction_model/winpred_model_sd"

In [17]:
numpy_data = np.load(PATH_TO_NUMPY_ARRAY)

pretrained_path = PATH_TO_PRETRAINED_MODEL_STATE_DICT
model = LSTMModel(input_size=num_in, hidden_size=10, num_layers=1, num_classes=2).to(device)
# If there's no GPU, change from device("cuda") to device("cpu")
model.load_state_dict(torch.load(pretrained_path, map_location= torch.device("cuda")), strict = False)

def test(numpy_data_raw, model):
    data = torch.tensor(numpy_data_raw[:, -100:], dtype = torch.float32)
    data = data[None, :]
    data = data.to(device)
    model.eval()
    outputs = model(data)
    _, predicted = torch.max(outputs.data, 1)
    predicted_int = predicted.type(torch.int)[0]
    print(f"The result is:{predicted_int:d}")
    return predicted_int
test(numpy_data, model)

The result is:0


tensor(0, device='cuda:0', dtype=torch.int32)