In [1]:
# Imports
import torch
from torch import nn
import sklearn
from sklearn.model_selection import StratifiedKFold, train_test_split
from sklearn.metrics import f1_score
import matplotlib.pyplot as plt
import numpy as np
import os

# Check pytorch version
torch.__version__

'2.9.1+cpu'

In [2]:
SEED = 42

In [3]:
# Check and determine device type
if torch.cuda.is_available():
    gpuAvailable = True
    device = torch.device("cuda")
elif torch.backends.mps.is_available():
    gpuAvailable = True
    device = torch.device("mps")
else:
    gpuAvailable = False
    device = torch.device("cpu")
print(gpuAvailable, device)

False cpu


In [4]:
# Set dataset up
curdir = os.getcwd()
X = torch.load(os.path.join(curdir, 'model1_input.pt')).to(torch.float32)
y = torch.tensor(np.loadtxt(os.path.join(curdir, 'emotion_targets.csv'), delimiter=','), dtype=torch.float32)

In [5]:
# Dataset variables
train_slice = .80
test_slice = .20

In [6]:
# Setup training, testing, and validation sets
# skf = StratifiedKFold(n_splits=a3)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=test_slice, random_state=SEED, shuffle=True)
# validation_slice = X_test.shape[0]/X_train.shape[0]
# X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=validation_slice, random_state=SEED, shuffle=True)

In [7]:
print(X_train.shape, y_train.shape)
print(X_test.shape, y_test.shape)

torch.Size([19661, 11008]) torch.Size([19661, 8])
torch.Size([4916, 11008]) torch.Size([4916, 8])


In [8]:
class EmotionModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(X_train.shape[1], X_train.shape[1]),
            nn.LeakyReLU(0.01),
            nn.Linear(X_train.shape[1], 5504),
            nn.LeakyReLU(0.01),
            nn.Linear(5504, y_train.shape[1]),
        )
    
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        return self.net(x)

torch.manual_seed(SEED)
emo_model = EmotionModel().to(device)

In [9]:
loss_fn = nn.BCEWithLogitsLoss().to(device)
# optimizer = torch.optim.Adam(emo_model.parameters(), lr=1e-3)
optimizer = torch.optim.SGD(params=emo_model.parameters(), lr=0.0001, momentum=0.9)

In [10]:
def accuracy_fn(y_true, y_pred):
    # correct = torch.eq(y_true, y_pred).sum().item()
    # acc = (correct/len(y_pred)) * 100
    print(y_true.shape, y_pred.shape)
    print("y_true.dtype=",y_true.dtype)
    print("y_pred.dtype=",y_pred.dtype)
    eq_value = torch.eq(y_true, y_pred)
    # print("eq_value.dtype=", eq_value.dtype)
    # print("eq_value=", eq_value)
    acc = (y_true == y_pred).float().mean().item() * 100
    return acc

In [25]:
def multiclass_accuracy_fn(y_pred, target):
    winners = y_pred.argmax(dim=1)
    print(winners.shape)
    
    if target.ndim == 2:
        target_idx = target.argmax(dim=1)
    else:
        target_idx = target
    
    corrects = (winners == target)
    print(corrects.sum().float(), "/", float( target.size(0) ))
    
    accuracy = corrects.sum().float() / float( target.size(0) )
    return accuracy

In [24]:
print(X_train.shape, y_train.shape)

torch.Size([19661, 11008]) torch.Size([19661, 8])


In [26]:
class EmotionModel_T3(nn.Module):
    def __init__(self):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(X_train.shape[1], X_train.shape[1]),
            nn.LeakyReLU(0.01),
            nn.Linear(X_train.shape[1], 5504),
            nn.LeakyReLU(0.01),
            nn.Linear(5504, y_train.shape[1]),
        )
    
    def forward(self, x):
        return self.net(x)

torch.manual_seed(SEED)
emo_model_T3 = EmotionModel_T3().to(device)

loss_fn_T3 = nn.CrossEntropyLoss()
optimizer_T3 = torch.optim.SGD(params=emo_model_T3.parameters(), lr=0.0001, momentum=0.9)

# Training Loop
torch.manual_seed(SEED)
torch.mps.manual_seed(SEED)

# Put data to target device
X_train, y_train = X_train.to(device), y_train.to(device)
X_test, y_test = X_test.to(device), y_test.to(device)

epochs = 100

# Build training and evaluation loop
for epoch in range(1, epochs + 1):
    ### Training
    emo_model_T3.train()

    # 1. Forward pass
    y_logits_T3 = emo_model_T3(X_train)
    y_pred_T3 = torch.softmax(y_logits_T3, dim=1)
    # print("y_pred_T3.shape=", y_pred_T3.shape)
    # print("y_train.shape=", y_train.shape)

    # 2. Calculate loss and accuracy
    y_train_idx = y_train.argmax(dim=1)
    loss_T3 = loss_fn_T3(y_logits_T3, y_train_idx)
    
    acc_T3 = multiclass_accuracy_fn(y_pred=y_pred_T3, target=y_train)
    print("train acc:", acc_T3)

    # 3. Optimizer zero grad
    optimizer_T3.zero_grad()
    
    # 4. Backpropagation
    loss_T3.backward()
    
    # 5. Optimizer Step
    optimizer_T3.step()

    ### Testing
    emo_model_T3.eval()
    with torch.inference_mode():
        # 1. Forward pass
        test_logits_T3 = emo_model_T3(X_test).squeeze()
        test_pred_T3 = torch.softmax(test_logits_T3, dim=1)

        # 2. Calculate the test loss/accuracy
        y_test_idx = y_test.argmax(dim=1)
        test_loss_T3 = loss_fn(test_logits_T3, y_test_idx)
        test_acc_T3 = multiclass_accuracy_fn(y_pred=test_pred_T3, target=y_test)

    # Print epoch output
    if epoch % 10 == 0:
        print(f'Epoch: {epoch} | Loss: {loss_T3:.5f}, Acc: {acc_T3:.2f}% | Test Loss: {test_loss_T3:.5f}, Test Acc: {test_acc_T3:.2f}')

RuntimeError: MPS backend out of memory (MPS allocated: 17.59 GiB, other allocations: 2.70 MiB, max allowed: 18.13 GiB). Tried to allocate 825.61 MiB on private pool. Use PYTORCH_MPS_HIGH_WATERMARK_RATIO=0.0 to disable upper limit for memory allocations (may cause system failure).

In [24]:
# Training Loop
torch.manual_seed(SEED)
torch.mps.manual_seed(SEED)

epochs = 100

# Put data to target device
X_train, y_train = X_train.to(device), y_train.to(device)
X_test, y_test = X_test.to(device), y_test.to(device)

# Build training and evaluation loop
for epoch in range(1, epochs + 1):
    ### Training
    emo_model.train()

    # 1. Forward pass
    y_logits = emo_model(X_train).squeeze()
    y_pred = torch.round(torch.sigmoid(y_logits))

    # 2. Calculate loss and accuracy
    loss = loss_fn(y_logits, y_train)
    acc = accuracy_fn(y_true=y_train, y_pred=y_pred)

    # 3. Optimizer zero grad
    optimizer.zero_grad()
    
    # 4. Backpropagation
    loss.backward()
    
    # 5. Optimizer Step
    optimizer.step()

    ### Testing
    emo_model.eval()
    with torch.inference_mode():
        # 1. Forward pass
        test_logits = emo_model(X_test).squeeze()
        test_pred = torch.round(torch.sigmoid(test_logits))

        # 2. Calculate the test loss/accuracy
        test_loss = loss_fn(test_logits, y_test)
        test_acc = accuracy_fn(y_true=y_test, y_pred=test_pred)

    # Print epoch output
    if epoch % 10 == 0:
        print(f'Epoch: {epoch} | Loss: {loss:.5f}, Acc: {acc:.2f}% | Test Loss: {test_loss:.5f}, Test Acc: {test_acc:.2f}')

Epoch: 10 | Loss: 0.30189, Acc: 88.18% | Test Loss: 0.30987, Test Acc: 87.90
Epoch: 20 | Loss: 0.30038, Acc: 88.19% | Test Loss: 0.30882, Test Acc: 87.90
Epoch: 30 | Loss: 0.29902, Acc: 88.24% | Test Loss: 0.30788, Test Acc: 87.90
Epoch: 40 | Loss: 0.29781, Acc: 88.26% | Test Loss: 0.30709, Test Acc: 87.92
Epoch: 50 | Loss: 0.29673, Acc: 88.29% | Test Loss: 0.30636, Test Acc: 87.92
Epoch: 60 | Loss: 0.29573, Acc: 88.30% | Test Loss: 0.30571, Test Acc: 87.95
Epoch: 70 | Loss: 0.29480, Acc: 88.32% | Test Loss: 0.30513, Test Acc: 87.97
Epoch: 80 | Loss: 0.29394, Acc: 88.34% | Test Loss: 0.30459, Test Acc: 87.99
Epoch: 90 | Loss: 0.29314, Acc: 88.36% | Test Loss: 0.30410, Test Acc: 87.99
Epoch: 100 | Loss: 0.29239, Acc: 88.38% | Test Loss: 0.30365, Test Acc: 88.01


In [14]:
y_train.shape

torch.Size([19661, 8])

In [15]:
y_train.shape

torch.Size([19661, 8])

In [None]:
# epochs=100

# for epoch in range(epochs):
    # 1. Forward pass
    # 2. Calculate the loss
    # 3. Optimize the zero grad
    # 4. Backpropagation
    # 5. Optimizer Step (Gradient Descent)
with torch.inference_mode():
    y_logits = emo_model(X_train.to(device))[:5]
    # print(y_logits)
    y_pred_probs = torch.sigmoid(y_logits)
    # print(y_pred_probs)
    y_preds = torch.round(y_pred_probs)
    y_preds_labels = torch.round(torch.sigmoid(emo_model(X_train.to(device))[:5]))
    print(torch.eq(y_preds.squeeze(), y_preds_labels.squeeze()))
    print(y_preds.squeeze())    
    
    #     f1 = f1_score(y_true_np, y_pred_np, average='macro')
    #     f1_loss = 1.0 - f1

    # print(
    #     f"Epoch {epoch+1:03d}/{epochs} | "
    #     f"loss: {loss.item():.4f} | "
    #     f"acc: {acc:.4f} | "
    #     f"f1: {f1:.4f} | "
    #     f"f1_loss: {f1_loss:.4f}"
    # )

In [12]:
class LinearRegressionModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.weights = nn.Parameter(torch.randn(1, requires_grad=True, dtype=torch.float32))
        self.bias = nn.Parameter(torch.rand(1, requires_grad=True, dtype=torch.float32))

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        return self.weights * x + self.bias

In [13]:
# Training loop example
torch.manual_seed(SEED)
model_0 = LinearRegressionModel()
list(model_0.parameters())

[Parameter containing:
 tensor([0.3367], requires_grad=True),
 Parameter containing:
 tensor([0.3904], requires_grad=True)]

In [14]:
# Set Optimizer and Loss Functions
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(params=model_0.parameters(), lr=0.001, momentum=0.9)

In [None]:
# Training Loop
epochs = 1

for epoch in range(epochs):
    model_0.train()
    
    y_pred = model_0(X_train)

    print(y_pred.shape, y_train.shape)
    loss = loss_fn(y_pred, y_train)

    optimizer.zero_grad()

    loss.backward()

    optimizer.step()
    
    # model_0.eval()