In [109]:
import os
import numpy as np
import math
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.tensorboard import SummaryWriter
from torch.optim import Adam
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelBinarizer
from torch.nn.utils.rnn import pad_sequence
from torch.utils.data import DataLoader, TensorDataset

: 

In [None]:
print(f'PyTorch version: {torch.__version__}')
print('*'*10)
print(f'_CUDA version: ')
!nvcc --version
print('*'*10)
print(f'CUDNN version: {torch.backends.cudnn.version()}')
print(f'Available GPU devices: {torch.cuda.device_count()}')
print(f'Device Name: {torch.cuda.get_device_name()}')

## Zdobycie danych

In [13]:
DATA_PATH = os.path.abspath(os.path.join('..', 'data', 'processed', 'ravdess'))

In [14]:
LANDMARK_INDEXES = [
    76, 306,  # mouth corners
    74, 73, 72, 11, 302, 303, 304, # upper lip
    90, 180, 85, 16, 315, 404, 320, # lower lip
    33, 161, 159, 157, 133, 154, 145, 163,  # left eye
    70, 63, 105, 66, 107,  # left eyebrow
    362, 384, 386, 388, 263, 390, 374, 381,  # right eye
    300, 293, 334, 296, 336,  # right eyebrow
    1, 5, 197, 168  # nose
]

REFERENCE_LANDMARK_INDEX = 0  # Middle of face

In [15]:
def convert_coordinates_to_avg_distance(frames):
    out = []
    for frame in frames:
        frame_distance = 0
        for landmark_idx in LANDMARK_INDEXES:
            frame_distance += math.sqrt(
                (frame[REFERENCE_LANDMARK_INDEX][0] - frame[landmark_idx][0])**2 +
                (frame[REFERENCE_LANDMARK_INDEX][1] - frame[landmark_idx][1])**2
            )
        out.append(frame_distance / len(LANDMARK_INDEXES))
    return np.array(out)

In [86]:
def load_data(data_path):
    all_data = []
    all_labels = []

    for file in os.listdir(data_path):
        if file.endswith(".npy"):
            data = np.load(os.path.join(data_path, file), allow_pickle=True)
            data = np.array(data, dtype=np.float32)

            all_data.append(data)

            label = int(file.split("-")[2])
            all_labels.append(label)

    return np.array(all_data, dtype=object), np.array(all_labels)

In [87]:
all_data, all_labels = load_data(DATA_PATH)

## Preprocessing danych

In [96]:
def preprocess_data(data, labels):
    tensor_data = [torch.tensor(d, dtype=torch.float32) for d in data]
    padded_data = pad_sequence(tensor_data, batch_first=True)

    encoder = LabelBinarizer()
    encoded_labels = encoder.fit_transform(labels)
    encoded_labels = torch.tensor(encoded_labels, dtype=torch.float32)

    X_train, X_temp, y_train, y_temp = train_test_split(
        padded_data, encoded_labels, test_size=0.3, random_state=42
    )
    X_val, X_test, y_val, y_test = train_test_split(
        X_temp, y_temp, test_size=0.5, random_state=42
    )

    return X_train, X_val, X_test, y_train, y_val, y_test

In [97]:
X_train, X_val, X_test, y_train, y_val, y_test = preprocess_data(all_data, all_labels)

In [None]:
print(X_train.shape)
print(y_train.shape)

In [None]:
# Class distribution
from collections import Counter

for key, val in sorted(Counter((label.argmax().item() for label in y_train)).items(), key=lambda i: i[0]):
    print(f"{key}:{val}")

### W podejściu wykorzystane zostaną 2 modele - pierwszy z nich będzie siecią konwolucyjną 2d, która będzie miała za zadanie nauczyć się rozpoznawać cechy charakterystyczne dla wybranej klatki (zbioru współrzędnych pkt charakterystycznych). Do klasyfikacji szeregu czasowego zostanie wykorzystana sekwencyjna sieć neuronowa LSTM.

## Zbudowanie modelu ekstrakcji cech

In [200]:
class EmotionClassifier(nn.Module):
    def __init__(self):
        super(EmotionClassifier, self).__init__()
        
        # Spatial feature extraction using Conv1D
        self.conv1 = nn.Conv1d(in_channels=2, out_channels=32, kernel_size=3, padding=1)
        self.pool1 = nn.MaxPool1d(kernel_size=2)
        
        # LSTM layers for temporal feature extraction
        self.lstm1 = nn.LSTM(input_size=32 * 239, hidden_size=128, batch_first=True, bidirectional=True)
        self.lstm2 = nn.LSTM(input_size=128 * 2, hidden_size=64, batch_first=True)
        
        # Fully connected classification layer
        self.fc = nn.Linear(64, 8)  # 8 emotion classes

    def forward(self, x):
        # x shape: (batch_size, frames, landmarks, coordinates)
        batch_size, frames, landmarks, coordinates = x.shape
        
        # Reshape for Conv1D: (batch_size * frames, landmarks, coordinates)
        x = x.view(-1, landmarks, coordinates).permute(0, 2, 1)
        
        # Spatial feature extraction
        x = F.relu(self.conv1(x))
        x = self.pool1(x)
        
        # Flatten spatial features
        x = x.view(batch_size, frames, -1)  # (batch_size, frames, features)
        
        # Temporal feature extraction
        x, _ = self.lstm1(x)
        x, _ = self.lstm2(x)
        
        # Classification
        x = self.fc(x[:, -1, :])  # Take the last timestep's output
        return x

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = EmotionClassifier().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = Adam(model.parameters(), lr=1e-4)

## Train model

In [202]:
BATCH_SIZE = 32
EPOCHS = 50

In [203]:
train_dataset = TensorDataset(X_train, y_train)
val_dataset = TensorDataset(X_val, y_val)
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False)

In [None]:
writer = SummaryWriter("runs/emotion_classifier")

for epoch in range(EPOCHS):
    model.train()
    train_loss = 0
    correct = 0
    total = 0
    
    for X_batch, y_batch in train_loader:
        X_batch, y_batch = X_batch.to(device), y_batch.to(device)
        y_batch = y_batch.argmax(dim=1)
        
        # Forward pass
        outputs = model(X_batch)
        loss = criterion(outputs, y_batch)
        
        # Backward pass and optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        train_loss += loss.item()
        _, predicted = outputs.max(1)
        correct += predicted.eq(y_batch).sum().item()
        total += y_batch.size(0)
    
    train_acc = correct / total

    # Validation
    model.eval()
    val_loss = 0
    correct = 0
    total = 0
    with torch.no_grad():
        for X_batch, y_batch in val_loader:
            X_batch, y_batch = X_batch.to(device), y_batch.to(device)
            y_batch = y_batch.argmax(dim=1)
            
            outputs = model(X_batch)
            loss = criterion(outputs, y_batch)
            val_loss += loss.item()
            _, predicted = outputs.max(1)
            correct += predicted.eq(y_batch).sum().item()
            total += y_batch.size(0)
    
    val_acc = correct / total

    writer.add_scalar("Loss/Train", train_loss, epoch)
    writer.add_scalar("Loss/Validation", val_loss, epoch)
    writer.add_scalar("Accuracy/Train", train_acc, epoch)
    writer.add_scalar("Accuracy/Validation", val_acc, epoch)

    print(f"Epoch {epoch + 1}/{EPOCHS}, Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}, "
          f"Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}")
    
writer.close()


Epoch 1/50, Train Loss: 130.4258, Train Acc: 0.1426, Val Loss: 28.9031, Val Acc: 0.1276
Epoch 2/50, Train Loss: 130.0251, Train Acc: 0.1491, Val Loss: 28.8230, Val Acc: 0.1276
Epoch 3/50, Train Loss: 129.9188, Train Acc: 0.1595, Val Loss: 28.7528, Val Acc: 0.1624
Epoch 4/50, Train Loss: 129.6616, Train Acc: 0.1665, Val Loss: 28.6748, Val Acc: 0.1578
Epoch 5/50, Train Loss: 129.0575, Train Acc: 0.1759, Val Loss: 28.5606, Val Acc: 0.1694
Epoch 6/50, Train Loss: 128.9573, Train Acc: 0.1750, Val Loss: 28.4839, Val Acc: 0.1694
Epoch 7/50, Train Loss: 128.4577, Train Acc: 0.1754, Val Loss: 28.3487, Val Acc: 0.1740
Epoch 8/50, Train Loss: 128.0457, Train Acc: 0.1814, Val Loss: 28.1774, Val Acc: 0.1717
Epoch 9/50, Train Loss: 127.1643, Train Acc: 0.1953, Val Loss: 27.9658, Val Acc: 0.1810
Epoch 10/50, Train Loss: 126.1503, Train Acc: 0.1968, Val Loss: 27.7974, Val Acc: 0.1647
Epoch 11/50, Train Loss: 125.5527, Train Acc: 0.1958, Val Loss: 27.6124, Val Acc: 0.1903
Epoch 12/50, Train Loss: 124.9

## Eval model

In [188]:
model.eval()

test_loss = 0
correct = 0
total = 0

with torch.no_grad():
    for X_batch, y_batch in DataLoader(TensorDataset(X_test, y_test), batch_size=BATCH_SIZE, shuffle=False):
        X_batch, y_batch = X_batch.to(device), y_batch.to(device)
        
        y_batch = y_batch.argmax(dim=1)
        
        outputs = model(X_batch)
        loss = criterion(outputs, y_batch)
        
        test_loss += loss.item()
        _, predicted = outputs.max(1)
        correct += predicted.eq(y_batch).sum().item()
        total += y_batch.size(0)

test_loss /= len(y_test)
test_acc = correct / total

print(f"Test Loss: {test_loss:.4f}, Test Accuracy: {test_acc:.4f}")

Test Loss: 0.0614, Test Accuracy: 0.2245
