In [67]:
#!pip install pyarrow
#!pip install tensorflow

In [68]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset,TensorDataset
import pandas as pd
import pyarrow
from torchvision import models, transforms
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, classification_report

In [69]:
#CONSTANTS
exercises_dict = {
    'Abduction': 0,
    'Bird': 1,
    'Bridge': 2,
    'Knee': 3,
    'Shoulder': 4,
    'Squat' : 5,
    'Stretch' : 6
}

In [4]:
df = pd.read_parquet('prep_dataset.parquet', engine='pyarrow')


In [5]:
data = df.values

In [8]:
#indices = df.index
#labels = df.columns

X = df.drop(['Participant', 'Exercise', 'Set', 'Camera'], axis=1)
y_exercise = df['Exercise']
#y_set = df['Set']
del df

In [33]:
X_train, X_test, y_train, y_test = train_test_split(X, y_exercise, test_size=0.2, random_state=42)

X_train = torch.tensor(X_train.values, dtype=torch.float32)
y_train = torch.tensor(np.array([exercises_dict[y] for y in y_train.values]), dtype=torch.int64)

X_test = torch.tensor(X_test.values, dtype=torch.float32)
y_test = torch.tensor(np.array([exercises_dict[y] for y in y_test.values]), dtype=torch.int64)

In [53]:
get_mlp = lambda: torch.nn.Sequential(

    torch.nn.Flatten(),
    torch.nn.Linear(100, 256),
    torch.nn.ReLU(),
    torch.nn.Linear(256, 256),
    torch.nn.ReLU(),
    torch.nn.Linear(256, 256),
    torch.nn.ReLU(),
    torch.nn.Linear(256, 256),
    torch.nn.ReLU(),
    torch.nn.Linear(256, 7),
)

get_cnn = lambda: torch.nn.Sequential(
    torch.nn.Conv1d(in_channels=4, out_channels=32, kernel_size=5, stride=1, padding=2),
    torch.nn.ReLU(),
    torch.nn.Conv1d(32, 64, 5, stride=2, padding=2),
    torch.nn.ReLU(),
    torch.nn.Conv1d(64, 64, 5, stride=1, padding=2),
    torch.nn.ReLU(),
    torch.nn.Conv1d(64, 128, 5, stride=2, padding=2),
    torch.nn.ReLU(),
    torch.nn.AdaptiveAvgPool1d(1),
    torch.nn.Conv1d(128, 7, 1),
    torch.nn.Flatten(),
)

In [54]:

"""
Return a modified dataset cls that can insert MNIST like images into larger
frames with an option for random shifts, scrambling the images in a
consistent way (using the same shuffling for all images) and adding random
Gaussian noise (to the base data, noise is always the same for a given
image).
"""

class CustomDataset(Dataset):
    def __init__(self, x_train, y_train, transform=None):
        self.x_train = x_train
        self.y_train = y_train
        self.transform = transform

    def __len__(self):
        return len(self.x_train)

    def __getitem__(self, idx):
        sample = {'input': self.x_train[idx], 'label': self.y_train[idx]}

        if self.transform:
            sample = self.transform(sample)

        return sample

In [65]:
learning_rate = 1e-3
num_epochs = 10

# Create an instance of the custom dataset
custom_dataset = CustomDataset(X_train, y_train)

# Create a DataLoader
batch_size = 64
custom_dataloader = DataLoader(custom_dataset, batch_size=batch_size, shuffle=True)

model = get_mlp()
model.train()
model_path = 'MLP.path'
model.load_state_dict(torch.load(model_path))

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

In [66]:
#def train_epoch()
for epoch in range(num_epochs):
    losses = []
    for batch in custom_dataloader:
        inputs, labels = batch['input'], batch['label']
        # Forward pass
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        losses.append(loss.item())
        
        # Backward pass and optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        

    print(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {np.mean(losses):.4f}')

Epoch [1/10], Loss: 0.0823
Epoch [2/10], Loss: 0.0837


KeyboardInterrupt: 

In [None]:
torch.save(model.state_dict(),'MLP.path')

In [None]:
model.eval()

# Make predictions on the test set
with torch.no_grad():
    predictions = model(X_test)

# Convert predictions to class labels
predicted_labels = torch.argmax(predictions, dim=1)

# Convert tensors to numpy arrays
y_test_np = y_test.numpy()
predicted_labels_np = predicted_labels.numpy()

# Print accuracy
accuracy = accuracy_score(y_test_np, predicted_labels_np)
print(f'Accuracy: {accuracy:.4f}')

# Print classification report
print('Classification Report:')
print(classification_report(y_test_np, predicted_labels_np, labels=range(7)))

In [15]:
input_size = 400  # Number of input channels
hidden_size = 64  # Size of the hidden layer
output_size = 7  # Number of classes (0 to 6)
learning_rate = 1e-3
num_epochs = 10

# Create an instance of the custom dataset
custom_dataset = CustomDataset(x_train, y_train)

# Create a DataLoader
batch_size = 64
custom_dataloader = DataLoader(custom_dataset, batch_size=batch_size, shuffle=True)

model = get_cnn()
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

In [14]:
x_train = np.moveaxis(x_train,-1,0)
print(x_train.shape)


(100, 4, 552545)


In [None]:
for epoch in range(num_epochs):
    losses = []
    for batch in custom_dataloader:
        inputs, labels = batch['input'], batch['label']

        # Forward pass
        outputs = model(inputs)

        loss = criterion(outputs, labels)
        print(loss)
        losses.append(loss.item())
        # Backward pass and optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        

    print(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {np.mean(losses):.4f}')