In [1]:
import json
import random
import time

import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch import Tensor
from torch.utils.data import DataLoader

In [2]:
# Settings

cuda = torch.cuda.is_available()
device = torch.device("cuda:0" if cuda else "cpu")

batch_size = 1
num_workers = 1
sequence_length = 3
learning_rate = 1e-3

params = {"batch_size": batch_size, "shuffle": True, "num_workers": num_workers}

match_1 = json.load(open("../data/dataset/match_1.json"))
match_2 = json.load(open("../data/dataset/match_2.json"))

ACTIONS = {
    "no action": torch.tensor([1, 0, 0, 0, 0, 0, 0, 0, 0]),
    "run": torch.tensor([0, 1, 0, 0, 0, 0, 0, 0, 0]),
    "pass": torch.tensor([0, 0, 1, 0, 0, 0, 0, 0, 0]),
    "rest": torch.tensor([0, 0, 0, 1, 0, 0, 0, 0, 0]),
    "walk": torch.tensor([0, 0, 0, 0, 1, 0, 0, 0, 0]),
    "dribble": torch.tensor([0, 0, 0, 0, 0, 1, 0, 0, 0]),
    "shot": torch.tensor([0, 0, 0, 0, 0, 0, 1, 0, 0]),
    "tackle": torch.tensor([0, 0, 0, 0, 0, 0, 0, 1, 0]),
    "cross": torch.tensor([0, 0, 0, 0, 0, 0, 0, 0, 1]),
}

In [3]:
# Data formatting

def generate_subsequences(sequence, subsequence_length):
    for i in range(len(sequence) - subsequence_length + 1):
        yield sequence[i:i+subsequence_length]

actions_1 = list(map(lambda x: x["label"], match_1))
actions_2 = list(map(lambda x: x["label"], match_2))
subsequences = [i for i in generate_subsequences(actions_1, sequence_length + 1)] + [i for i in generate_subsequences(actions_2, sequence_length + 1)]
probability = dict()

def populate_prob(dictionary, x):
    dictionary["-".join(x[:-1])] = dictionary.get("-".join(x[:-1]) ,torch.tensor([0, 0, 0, 0, 0, 0, 0, 0, 0])) + ACTIONS[x[sequence_length]]

list(map(lambda x: populate_prob(probability, x), subsequences))
probability = dict(map(lambda x: (x, probability[x]/sum(probability[x])), probability))

data = [i for i in generate_subsequences(actions_1, sequence_length)] + [i for i in generate_subsequences(actions_2, sequence_length)]
dataset = list(map(lambda d: (torch.cat(list(map(lambda x: (torch.unsqueeze(ACTIONS[x], 0)), d)), dim=-2).type(torch.float), probability['-'.join(d)].type(torch.float)), data))
loader = DataLoader(dataset, **params)

In [4]:
# Model

class ExtractLSTM(nn.Module):
    def forward(self, x):
        tensor, _ = x
        return tensor[:, -1, :]

class Model(nn.Module):
    def __init__(self):
        super(Model, self).__init__()
        self.fc = nn.Identity()

    def forward(self, x: Tensor) -> Tensor:
        output = self.fc(x)
        return output

    def save(self, path: str="../models") -> str:
        name = f"{self._get_name()}_{int(time.time())}"
        torch.save(self.fc.state_dict(), f"{path.rstrip('/')}/{name}")
        return name

    def load(self, model_path: str):
        self.fc.load_state_dict(torch.load(model_path))
        self.eval()
        return self

class ActionModel(Model):
    def __init__(self, sequence_length: int=3):
        super(ActionModel, self).__init__()
        self.sequence_length = sequence_length
        self.fc = nn.Sequential(
            nn.LSTM(9, 5, self.sequence_length),
            ExtractLSTM(),
            nn.Linear(5, 9),
            nn.Sigmoid()
        )

    def forward(self, x: Tensor) -> Tensor:
        output = self.fc(x)
        return output

    def predict(self, x: Tensor) -> str:
        output = self(x)
        output = random.choices(list(ACTIONS.items()), weights=torch.squeeze(torch.squeeze(output, dim=0), dim=0))[0]
        return output

def train(loader: DataLoader, model: Model, criterion: nn.Module, optimizer: optim, num_epochs: int=10) -> list[np.ndarray]:
    """
    Train function
    """
    start_time = time.time()
    losses = []
    for epoch in range(num_epochs):
        if len(losses)>3 and round(max(losses[-3:]), 3) == round(min(losses[-3:]), 3):
            print("Stopped by stable loss")
            break
        losses_ = []
        for data in loader:
            #Data loading
            x = data[0]
            y = data[1]
            if cuda:
                x = data[0].cuda()
                y = data[1].cuda()
            x.requires_grad = True
            y.requires_grad = True
            output = model(x)
            #Backpropagation
            optimizer.zero_grad()
            loss = criterion(y, output)
            loss.backward()
            optimizer.step()
            #Display & metrics
            losses_.append(float(loss))
            print('Epoch {:>4} of {}, Train Loss: {:5.2f}, Elapsed time: {:.2f}'.format(epoch+1, num_epochs, losses_[-1], time.time()-start_time),end="\r",flush=True)
        losses.append(np.mean(losses_))
        print('Epoch {:>4} of {}, Train Loss: {:5.2f}, Elapsed time: {:.2f}'.format(epoch+1, num_epochs, losses[-1], time.time()-start_time),end="\n",flush=True)
    return losses

In [5]:
model = ActionModel()

losses = train(loader, model, nn.MSELoss(), optim.Adam(model.parameters(), lr=learning_rate), 10)

model.save()

Epoch    1 of 10, Train Loss:  0.06, Elapsed time: 3.14
Epoch    2 of 10, Train Loss:  0.03, Elapsed time: 5.94
Epoch    3 of 10, Train Loss:  0.03, Elapsed time: 8.68
Epoch    4 of 10, Train Loss:  0.02, Elapsed time: 11.41
Epoch    5 of 10, Train Loss:  0.01, Elapsed time: 14.13
Epoch    6 of 10, Train Loss:  0.01, Elapsed time: 16.83
Epoch    7 of 10, Train Loss:  0.01, Elapsed time: 19.80
Epoch    8 of 10, Train Loss:  0.01, Elapsed time: 22.61
Epoch    9 of 10, Train Loss:  0.01, Elapsed time: 26.08
Epoch   10 of 10, Train Loss:  0.01, Elapsed time: 29.38


'ActionModel_1698809867'

In [6]:
n = 20
def predict(n, sequence_length):
    """
    predict n actions
    """
    keys = list()
    tensors = [torch.randn([9]) for _ in range(sequence_length)]
    print(f"Prediction of length {n}")
    for i in range(n):
        x = torch.unsqueeze(torch.cat(list(map(lambda t: torch.unsqueeze(t, dim=0), tensors[-sequence_length:])), dim=0).type(torch.float), dim=0)
        key, output = model.predict(x)
        keys.append(key)
        tensors.append(torch.squeeze(output, dim=0))

    print(" -> ".join(keys), end="\n\n")

for i in range(10):
    predict(n, sequence_length)

Prediction of length 20
walk -> walk -> walk -> rest -> rest -> walk -> walk -> run -> dribble -> walk -> walk -> walk -> walk -> walk -> run -> run -> run -> run -> run -> run

Prediction of length 20
run -> run -> run -> run -> run -> run -> run -> run -> run -> dribble -> walk -> no action -> walk -> dribble -> pass -> walk -> walk -> walk -> walk -> rest

Prediction of length 20
run -> rest -> walk -> dribble -> run -> run -> run -> run -> run -> run -> run -> walk -> walk -> pass -> walk -> walk -> dribble -> dribble -> pass -> walk

Prediction of length 20
run -> run -> dribble -> dribble -> shot -> walk -> shot -> run -> tackle -> run -> run -> run -> run -> run -> shot -> dribble -> run -> run -> run -> run

Prediction of length 20
walk -> rest -> walk -> walk -> pass -> run -> run -> run -> dribble -> shot -> run -> run -> tackle -> run -> run -> pass -> walk -> cross -> run -> run

Prediction of length 20
run -> run -> run -> run -> run -> dribble -> dribble -> walk -> rest -