In [2]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms
import torch.nn.functional as F
from tqdm import tqdm

# Định nghĩa khối Bottleneck
class Bottleneck(nn.Module):
    def __init__(self, in_channels, out_channels, expansion_factor, stride):
        super(Bottleneck, self).__init__()
        hidden_channels = in_channels * expansion_factor

        self.conv1 = nn.Conv2d(in_channels, hidden_channels, kernel_size=1, bias=False)
        self.bn1 = nn.BatchNorm2d(hidden_channels)
        self.conv2 = nn.Conv2d(hidden_channels, hidden_channels, kernel_size=3, stride=stride,
                              padding=1, groups=hidden_channels, bias=False)
        self.bn2 = nn.BatchNorm2d(hidden_channels)
        self.conv3 = nn.Conv2d(hidden_channels, out_channels, kernel_size=1, bias=False)
        self.bn3 = nn.BatchNorm2d(out_channels)

        self.stride = stride
        self.use_residual = (in_channels == out_channels) and (stride == 1)

    def forward(self, x):
        out = F.relu6(self.bn1(self.conv1(x)))
        out = F.relu6(self.bn2(self.conv2(out)))
        out = self.bn3(self.conv3(out))
        if self.use_residual:
            out = out + x
        return out

# Định nghĩa mô hình Light-FER MobileNet
class LightFERMobileNet(nn.Module):
    def __init__(self, num_classes=7):
        super(LightFERMobileNet, self).__init__()
        self.conv1 = nn.Conv2d(1, 32, kernel_size=3, stride=2, padding=1, bias=False)  # Grayscale input
        self.bn1 = nn.BatchNorm2d(32)
        self.bottlenecks = nn.Sequential(
            Bottleneck(32, 16, 1, 1),
            Bottleneck(16, 24, 6, 2), Bottleneck(24, 24, 6, 1),
            Bottleneck(24, 32, 6, 2), Bottleneck(32, 32, 6, 1), Bottleneck(32, 32, 6, 1),
            Bottleneck(32, 64, 6, 2), Bottleneck(64, 64, 6, 1), Bottleneck(64, 64, 6, 1), Bottleneck(64, 64, 6, 1),
            Bottleneck(64, 96, 6, 1), Bottleneck(96, 96, 6, 1), Bottleneck(96, 96, 6, 1),
            Bottleneck(96, 160, 6, 2), Bottleneck(160, 160, 6, 1), Bottleneck(160, 160, 6, 1),
            Bottleneck(160, 320, 6, 1),
        )
        self.conv2 = nn.Conv2d(320, 1280, kernel_size=1, bias=False)
        self.bn2 = nn.BatchNorm2d(1280)
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(1280, num_classes)

    def forward(self, x):
        x = F.relu6(self.bn1(self.conv1(x)))
        x = self.bottlenecks(x)
        x = F.relu6(self.bn2(self.conv2(x)))
        x = self.avgpool(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)
        return x

# Tối ưu hóa: Cắt tỉa và Lượng tử hóa
def apply_pruning(model, pruning_rate=0.3):
    for name, param in model.named_parameters():
        if 'weight' in name:
            tensor = param.data.cpu()
            mask = tensor.abs() > tensor.abs().quantile(pruning_rate)
            param.data = tensor * mask.float().to(param.device)
    return model

def apply_quantization(model):
    model.half()
    return model

# Hàm Huấn luyện
def train(model, train_loader, criterion, optimizer, device, epoch):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0

    for inputs, labels in tqdm(train_loader, desc=f"Training Epoch {epoch}"):
        inputs, labels = inputs.to(device).half(), labels.to(device)  # FP16
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        _, predicted = torch.max(outputs, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

    epoch_loss = running_loss / len(train_loader)
    epoch_acc = 100 * correct / total
    print(f"Train Loss: {epoch_loss:.4f}, Accuracy: {epoch_acc:.2f}%")
    return epoch_loss, epoch_acc

# Hàm Đánh giá
def evaluate(model, test_loader, criterion, device):
    model.eval()
    running_loss = 0.0
    correct = 0
    total = 0

    with torch.no_grad():
        for inputs, labels in tqdm(test_loader, desc="Evaluating"):
            inputs, labels = inputs.to(device).half(), labels.to(device)  # FP16
            outputs = model(inputs)
            loss = criterion(outputs, labels)

            running_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    test_loss = running_loss / len(test_loader)
    test_acc = 100 * correct / total
    print(f"Test Loss: {test_loss:.4f}, Accuracy: {test_acc:.2f}%")
    return test_loss, test_acc

# Ví dụ Dataset tùy chỉnh (giả lập AffectNet)
class FERDataset(Dataset):
    def __init__(self, images, labels, transform=None):
        self.images = images  # List of grayscale images (numpy arrays)
        self.labels = labels  # List of labels
        self.transform = transform

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        image = self.images[idx]
        label = self.labels[idx]
        if self.transform:
            image = self.transform(image)
        return image, label

# Khởi tạo và chạy
if __name__ == "__main__":
    # Thiết bị
    device = torch.device("cpu")

    # Tiền xử lý dữ liệu
    transform = transforms.Compose([
        transforms.ToPILImage(),
        transforms.Grayscale(num_output_channels=1),
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.5], std=[0.5])
    ])

    # Giả lập dữ liệu (thay bằng AffectNet thực tế)
    import numpy as np
    dummy_images = [np.random.rand(224, 224) for _ in range(1000)]
    dummy_labels = np.random.randint(0, 7, size=1000)
    train_dataset = FERDataset(dummy_images, dummy_labels, transform=transform)
    train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)

    # Khởi tạo mô hình
    model = LightFERMobileNet(num_classes=7).to(device)
    model = apply_pruning(model, pruning_rate=0.3)
    model = apply_quantization(model)

    # Cấu hình huấn luyện
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    num_epochs = 5

    # Huấn luyện và đánh giá
    for epoch in range(1, num_epochs + 1):
        train(model, train_loader, criterion, optimizer, device, epoch)
        evaluate(model, train_loader, criterion, device)  # Dùng train_loader để demo

    # In số tham số
    print(f"Total parameters: {sum(p.numel() for p in model.parameters())}")

Training Epoch 1:   0%|          | 0/32 [00:03<?, ?it/s]


RuntimeError: expected scalar type Long but found Int