In [None]:
import os
import torch
import torch.nn as nn
import torch.optim
import torch.utils.data
import torchvision.datasets as datasets
import torchvision.transforms as transforms
import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns
from sklearn.metrics import confusion_matrix

# Definicja etykiet emocji
emotion_labels = ['Neutral', 'Happiness', 'Sadness', 'Surprise', 'Fear', 'Disgust', 'Anger']

# Sprawdzenie dostępności GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# Ustawienie ścieżek
data_path = "/kaggle/input/raf-db-zmienione-labele"
train_dir = os.path.join(data_path, "train")
test_dir = os.path.join(data_path, "test")

# Konfiguracja parametrów
num_classes = len(emotion_labels)
batch_size = 64
num_epochs = 20
learning_rate = 0.01
momentum = 0.9
weight_decay = 1e-4

# Normalizacja obrazów
normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406],
                                 std=[0.229, 0.224, 0.225])

# Przygotowanie transformacji dla danych
train_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(10),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2),
    transforms.ToTensor(),
    normalize,
])

test_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    normalize,
])


In [None]:
import torch
import torch.nn as nn
import torchvision.models as models
import torch.nn.functional as F

class LocalFeatureExtractor(nn.Module):
    def __init__(self, inplanes, planes):
        super(LocalFeatureExtractor, self).__init__()
        self.conv1_1 = nn.Conv2d(inplanes, inplanes, kernel_size=3, stride=2, padding=1, groups=inplanes, bias=False)
        self.bn1_1 = nn.BatchNorm2d(inplanes)
        self.conv1_2 = nn.Conv2d(inplanes, planes, kernel_size=1, stride=1, bias=False)
        self.bn1_2 = nn.BatchNorm2d(planes)

        self.conv2_1 = nn.Conv2d(inplanes, inplanes, kernel_size=3, stride=2, padding=1, groups=inplanes, bias=False)
        self.bn2_1 = nn.BatchNorm2d(inplanes)
        self.conv2_2 = nn.Conv2d(inplanes, planes, kernel_size=1, stride=1, bias=False)
        self.bn2_2 = nn.BatchNorm2d(planes)

        self.conv3_1 = nn.Conv2d(inplanes, inplanes, kernel_size=3, stride=2, padding=1, groups=inplanes, bias=False)
        self.bn3_1 = nn.BatchNorm2d(inplanes)
        self.conv3_2 = nn.Conv2d(inplanes, planes, kernel_size=1, stride=1, bias=False)
        self.bn3_2 = nn.BatchNorm2d(planes)

        self.conv4_1 = nn.Conv2d(inplanes, inplanes, kernel_size=3, stride=2, padding=1, groups=inplanes, bias=False)
        self.bn4_1 = nn.BatchNorm2d(inplanes)
        self.conv4_2 = nn.Conv2d(inplanes, planes, kernel_size=1, stride=1, bias=False)
        self.bn4_2 = nn.BatchNorm2d(planes)

        self.relu = nn.ReLU(inplace=True)

    def forward(self, x):
        patch_11 = x[:, :, 0:28, 0:28]
        patch_21 = x[:, :, 28:56, 0:28]
        patch_12 = x[:, :, 0:28, 28:56]
        patch_22 = x[:, :, 28:56, 28:56]

        out_1 = self.relu(self.bn1_2(self.conv1_2(self.relu(self.bn1_1(self.conv1_1(patch_11))))))
        out_2 = self.relu(self.bn2_2(self.conv2_2(self.relu(self.bn2_1(self.conv2_1(patch_21))))))
        out_3 = self.relu(self.bn3_2(self.conv3_2(self.relu(self.bn3_1(self.conv3_1(patch_12))))))
        out_4 = self.relu(self.bn4_2(self.conv4_2(self.relu(self.bn4_1(self.conv4_1(patch_22))))))

        out1 = torch.cat([out_1, out_2], dim=2)
        out2 = torch.cat([out_3, out_4], dim=2)
        out = torch.cat([out1, out2], dim=3)

        return out

# Implementacja mechanizmu uwagi
class AttentionModule(nn.Module):
    def __init__(self, in_channels):
        super(AttentionModule, self).__init__()
        self.attention = nn.Sequential(
            nn.Conv2d(in_channels, in_channels // 16, kernel_size=1),
            nn.ReLU(),
            nn.Conv2d(in_channels // 16, in_channels, kernel_size=1),
            nn.Sigmoid()
        )

    def forward(self, x):
        attention_weights = self.attention(x)
        return x * attention_weights

# Hybrydowy model EfficientFace-ResNet
class EfficientFaceResNet(nn.Module):
    def __init__(self, num_classes):
        super(EfficientFaceResNet, self).__init__()
        self.resnet = models.resnet50(pretrained=True)
        self.local_feature_extractor = LocalFeatureExtractor(3, 116)
        self.attention1 = AttentionModule(256)
        self.attention2 = AttentionModule(512)
        self.attention3 = AttentionModule(1024)
        self.attention4 = AttentionModule(2048)
        
        # Dodajemy warstwę konwolucyjną do dostosowania wymiarów cech lokalnych
        self.local_feature_adapter = nn.Conv2d(116, 2048, kernel_size=1)
        
        self.fc = nn.Linear(2048, num_classes)

    def forward(self, x):
        # Local Feature Extractor
        local_features = self.local_feature_extractor(x)

        # ResNet layers with attention
        x = self.resnet.conv1(x)
        x = self.resnet.bn1(x)
        x = self.resnet.relu(x)
        x = self.resnet.maxpool(x)

        x = self.resnet.layer1(x)
        x = self.attention1(x)

        x = self.resnet.layer2(x)
        x = self.attention2(x)

        x = self.resnet.layer3(x)
        x = self.attention3(x)

        x = self.resnet.layer4(x)
        x = self.attention4(x)

        # Dostosuj wymiary cech lokalnych i połącz z cechami globalnymi
        adapted_local_features = self.local_feature_adapter(local_features)
        adapted_local_features = F.interpolate(adapted_local_features, size=x.size()[2:], mode='bilinear', align_corners=False)
        x = x + adapted_local_features

        x = self.resnet.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)
        return x

# Inicjalizacja modelu
model = EfficientFaceResNet(num_classes=7).to(device)

In [None]:
# Wczytanie danych treningowych i testowych
train_dataset = datasets.ImageFolder(train_dir, transform=train_transform)
test_dataset = datasets.ImageFolder(test_dir, transform=test_transform)

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=4, pin_memory=True)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=4, pin_memory=True)
# Wyświetlenie przykładowych obrazów
def show_samples(dataset, num_samples=7):
    fig, axes = plt.subplots(1, num_samples, figsize=(15, 3))
    for i in range(num_samples):
        idx = np.random.randint(len(dataset))
        img, label = dataset[idx]
        img = img.permute(1, 2, 0).numpy()
        img = (img * np.array([0.229, 0.224, 0.225]) + np.array([0.485, 0.456, 0.406])).clip(0, 1)
        axes[i].imshow(img)
        axes[i].set_title(emotion_labels[label])
        axes[i].axis('off')
    plt.show()

print("Przykładowe obrazy treningowe:")
show_samples(train_dataset)
print("Przykładowe obrazy testowe:")
show_samples(test_dataset)

# Inicjalizacja modelu
model = model.to(device)

# Inicjalizacja kryterium straty i optymalizatora
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), learning_rate, momentum=momentum, weight_decay=weight_decay)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=20, gamma=0.1)

In [None]:
class AverageMeter(object):
    def __init__(self):
        self.reset()

    def reset(self):
        self.val = 0
        self.avg = 0
        self.sum = 0
        self.count = 0

    def update(self, val, n=1):
        self.val = val
        self.sum += val * n
        self.count += n
        self.avg = self.sum / self.count

class EarlyStopping:
    def __init__(self, patience=7, verbose=False, delta=0):
        self.patience = patience
        self.verbose = verbose
        self.counter = 0
        self.best_score = None
        self.early_stop = False
        self.val_loss_min = np.Inf
        self.delta = delta

    def __call__(self, val_loss, model):
        score = -val_loss

        if self.best_score is None:
            self.best_score = score
            self.save_checkpoint(val_loss, model)
        elif score < self.best_score + self.delta:
            self.counter += 1
            print(f'EarlyStopping counter: {self.counter} out of {self.patience}')
            if self.counter >= self.patience:
                self.early_stop = True
        else:
            self.best_score = score
            self.save_checkpoint(val_loss, model)
            self.counter = 0

    def save_checkpoint(self, val_loss, model):
        if self.verbose:
            print(f'Validation loss decreased ({self.val_loss_min:.6f} --> {val_loss:.6f}). Saving model ...')
        torch.save(model.state_dict(), 'checkpoint.pt')
        self.val_loss_min = val_loss

class LRScheduler:
    def __init__(self, optimizer, patience=5, min_lr=1e-6, factor=0.5):
        self.optimizer = optimizer
        self.patience = patience
        self.min_lr = min_lr
        self.factor = factor
        self.lr_scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau( 
                self.optimizer,
                mode='min',
                patience=self.patience,
                factor=self.factor,
                min_lr=self.min_lr,
                verbose=True
            )

    def __call__(self, val_loss):
        self.lr_scheduler.step(val_loss)

def accuracy(output, target, topk=(1,)):
    with torch.no_grad():
        maxk = max(topk)
        batch_size = target.size(0)

        _, pred = output.topk(maxk, 1, True, True)
        pred = pred.t()
        correct = pred.eq(target.view(1, -1).expand_as(pred))

        res = []
        for k in topk:
            correct_k = correct[:k].reshape(-1).float().sum(0, keepdim=True)
            res.append(correct_k.mul_(100.0 / batch_size))
        return res

In [None]:
def train_epoch(model, train_loader, criterion, optimizer, device):
    model.train()
    losses = AverageMeter()
    top1 = AverageMeter()

    for images, targets in train_loader:
        images, targets = images.to(device), targets.to(device)

        outputs = model(images)
        loss = criterion(outputs, targets)

        acc1, = accuracy(outputs, targets, topk=(1,))
        losses.update(loss.item(), images.size(0))
        top1.update(acc1[0], images.size(0))

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    return losses.avg, top1.avg

def validate(model, val_loader, criterion, device):
    model.eval()
    losses = AverageMeter()
    top1 = AverageMeter()

    with torch.no_grad():
        for images, targets in val_loader:
            images, targets = images.to(device), targets.to(device)

            outputs = model(images)
            loss = criterion(outputs, targets)

            acc1, = accuracy(outputs, targets, topk=(1,))
            losses.update(loss.item(), images.size(0))
            top1.update(acc1[0], images.size(0))

    return losses.avg, top1.avg

In [None]:
early_stopping = EarlyStopping(patience=10, verbose=True)
lr_scheduler = LRScheduler(optimizer)

# Listy do przechowywania wyników
train_losses = []
train_accuracies = []
val_losses = []
val_accuracies = []

best_val_acc = 0.0

for epoch in range(num_epochs):
    print(f'Epoch {epoch+1}/{num_epochs}')
    
    # Trening
    train_loss, train_acc = train_epoch(model, train_loader, criterion, optimizer, device)
    train_losses.append(float(train_loss))  # Konwersja na float, jeśli to tensor
    train_accuracies.append(float(train_acc))
    
    val_loss, val_acc = validate(model, test_loader, criterion, device)
    val_losses.append(float(val_loss))  # Konwersja na float, jeśli to tensor
    val_accuracies.append(float(val_acc))
    
    # Wydruk wyników
    print(f'Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.2f}%')
    print(f'Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.2f}%')
    
    # Aktualizacja learning rate
    lr_scheduler(val_loss)
    
    # Zapisywanie najlepszego modelu
    if val_acc > best_val_acc:
        best_val_acc = val_acc
        torch.save(model.state_dict(), 'best_model.pth')
        print(f'New best model saved with validation accuracy: {best_val_acc:.2f}%')
    
    # Early stopping
    early_stopping(val_loss, model)
    if early_stopping.early_stop:
        print("Early stopping")
        break

# Wczytanie najlepszego modelu
model.load_state_dict(torch.load('best_model.pth'))

# Ewaluacja na zbiorze testowym
model.eval()
test_loss, test_acc = validate(model, test_loader, criterion, device)
print(f'Test Loss: {test_loss:.4f}, Test Acc: {test_acc:.2f}%')

# Macierz pomyłek
y_true = []
y_pred = []

model.eval()
with torch.no_grad():
    for images, labels in test_loader:
        images, labels = images.to(device), labels.to(device)
        outputs = model(images)
        _, predicted = torch.max(outputs, 1)
        y_true.extend(labels.cpu().numpy())
        y_pred.extend(predicted.cpu().numpy())

cm = confusion_matrix(y_true, y_pred)
plt.figure(figsize=(10,8))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
plt.title('Confusion Matrix')
plt.ylabel('True Label')
plt.xlabel('Predicted Label')
plt.show()

# Zapisywanie modelu
torch.save(model.state_dict(), 'efficient_face_raf_db.pth')
print("Model został zapisany.")

In [None]:
plt.figure(figsize=(12,5))

plt.subplot(1,2,1)
plt.plot(train_losses, label='Train Loss')
plt.plot(val_losses, label='Validation Loss')
plt.title('Training and Validation Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()

plt.subplot(1,2,2)
plt.plot(train_accuracies, label='Train Accuracy')
plt.plot(val_accuracies, label='Validation Accuracy')
plt.title('Training and Validation Accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.legend()

plt.tight_layout()
plt.show()

In [None]:
model.eval()
test_loss, test_acc = validate(model, test_loader, criterion, device)
print(f'Test Loss: {test_loss:.4f}, Test Acc: {test_acc:.2f}%')

# Macierz pomyłek
from sklearn.metrics import confusion_matrix
import seaborn as sns

y_true = []
y_pred = []

model.eval()
with torch.no_grad():
    for images, labels in test_loader:
        images, labels = images.to(device), labels.to(device)
        outputs = model(images)
        _, predicted = torch.max(outputs, 1)
        y_true.extend(labels.cpu().numpy())
        y_pred.extend(predicted.cpu().numpy())

cm = confusion_matrix(y_true, y_pred)
plt.figure(figsize=(12,10))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=emotion_labels, yticklabels=emotion_labels)
plt.title('Confusion Matrix')
plt.ylabel('True Label')
plt.xlabel('Predicted Label')
plt.show()

In [None]:
# Zapisywanie całego modelu
torch.save(model, 'efficient_face_full_model.pth')

# Zapisywanie tylko stanu modelu (zalecane)
torch.save(model.state_dict(), 'efficient_face_state_dict.pth')

print("Model został zapisany.")