<a href="https://colab.research.google.com/github/haoyuh3/ai-accelerator/blob/main/multitask.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive
drive.mount('/content/drive')

## connect to google drive

import tensorflow as tf

# 检查是否有可用的GPU
gpus = tf.config.list_physical_devices('GPU')
if gpus:
    print("GPU is available.")
    for gpu in gpus:
        print(f"Device: {gpu}")
else:
    print("No GPU available. Using CPU.")


Mounted at /content/drive
GPU is available.
Device: PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')


In [None]:

##dataset fer2013
import zipfile
from google.colab import drive


drive.mount('/content/drive')

zip_file_path = '/content/drive/MyDrive/haoyuh3/fer2013plus.zip'
destination_path = '/content/datasetr'

with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
    zip_ref.extractall(destination_path)


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
import zipfile
from google.colab import drive
zip_file_path = '/content/drive/MyDrive/haoyuh3/lpfw.zip'
destination_path = '/content/lpfw'

with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
    zip_ref.extractall(destination_path)

In [None]:
train_e_dir = '/content/datasetr/fer2013plus/fer2013/train'
test_e_dir = '/content/datasetr/fer2013plus/fer2013/test'

train_lp_dir = '/content/lpfw/trainset'
test_lp_dir = '/content/lpfw/testset'

In [None]:
import os
from torch.utils.data import Dataset, DataLoader
import cv2
import numpy as np
import torch
from torchvision import transforms
from torchvision.utils import make_grid
from PIL import Image
import matplotlib.pyplot as plt
import matplotlib.patches as patches

class LFPWDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        self.root_dir = root_dir
        self.transform = transform
        self.image_paths = [os.path.join(root_dir, fname) for fname in os.listdir(root_dir) if fname.endswith('.png')]
        self.label_paths = [os.path.join(root_dir, os.path.splitext(fname)[0] + '.pts') for fname in os.listdir(root_dir) if fname.endswith('.png')]

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        image_path = self.image_paths[idx]
        label_path = self.label_paths[idx]
        image = cv2.imread(image_path)
        original_height, original_width = image.shape[:2]
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        image = Image.fromarray(image)  # 将 numpy 数组转换为 PIL 图像
        bbox = self.read_bbox(label_path, original_width, original_height)
        if self.transform:
            image = self.transform(image)
        return image, torch.tensor(bbox, dtype=torch.float)

    def read_bbox(self, label_path, original_width, original_height):
        points = []
        with open(label_path, 'r') as file:
            lines = file.readlines()[3:71]  # 跳过前3行头信息，读取68个点
            for line in lines:
                x, y = line.strip().split()
                points.append((float(x), float(y)))

        points = np.array(points)
        x_min = np.min(points[:, 0])
        y_min = np.min(points[:, 1])
        x_max = np.max(points[:, 0])
        y_max = np.max(points[:, 1])
        bbox = [x_min, y_min, x_max, y_max]

        # Resize bbox to match the transformed image size
        bbox = [bbox[0] * 224 / original_width, bbox[1] * 224 / original_height,
                bbox[2] * 224 / original_width, bbox[3] * 224 / original_height]
        return bbox







In [None]:
from PIL import Image
import os
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
import cv2

class FER2013PlusDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        self.root_dir = root_dir
        self.transform = transform
        self.image_paths = []
        self.labels = []
        self.label_map = self._create_label_map()

        # 遍历每个表情的文件夹并收集图像路径和标签
        for label_name, label_idx in self.label_map.items():
            label_dir = os.path.join(root_dir, label_name)
            for image_name in os.listdir(label_dir):
                image_path = os.path.join(label_dir, image_name)
                self.image_paths.append(image_path)
                self.labels.append(label_idx)

    def _create_label_map(self):
        # 根据表情文件夹名创建标签映射
        label_names = sorted(os.listdir(self.root_dir))
        return {label_name: idx for idx, label_name in enumerate(label_names)}

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        image_path = self.image_paths[idx]
        label = self.labels[idx]
        image = Image.open(image_path).convert('RGB')
        if self.transform:
            image = self.transform(image)
        return image, torch.tensor(label, dtype=torch.long)  # 确保标签是长整型张量，并且形状为 (N,)

# 使用示例
transform_fer = {
    'train': transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.RandomRotation(5),
        transforms.RandomHorizontalFlip(),
        transforms.RandomVerticalFlip(),
        transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.2),
        transforms.RandomResizedCrop(size=224, scale=(0.8, 1.0)),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    'val': transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    'test': transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
}


In [None]:
pip install timm

Collecting timm
  Downloading timm-1.0.3-py3-none-any.whl (2.3 MB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/2.3 MB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━━━━━━━━━━━[0m[90m╺[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.7/2.3 MB[0m [31m21.0 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.3/2.3 MB[0m [31m34.4 MB/s[0m eta [36m0:00:00[0m
Collecting nvidia-cuda-nvrtc-cu12==12.1.105 (from torch->timm)
  Using cached nvidia_cuda_nvrtc_cu12-12.1.105-py3-none-manylinux1_x86_64.whl (23.7 MB)
Collecting nvidia-cuda-runtime-cu12==12.1.105 (from torch->timm)
  Using cached nvidia_cuda_runtime_cu12-12.1.105-py3-none-manylinux1_x86_64.whl (823 kB)
Collecting nvidia-cuda-cupti-cu12==12.1.105 (from torch->timm)
  Using cached nvidia_cuda_cupti_cu12-12.1.105-py3-none-manylinux1_x86_64.whl (14.1 MB)
Collecting nvidia-cudnn-cu12==8.9.2.26 (from torch->timm)
  Using cached nvidia_cudnn_cu12-8.9.2.26-py3-none

In [None]:
import torch
import torch.nn as nn
import timm


class MultiTaskDataset(Dataset):
    def __init__(self, fer_dataset, lfpw_dataset):
        self.fer_dataset = fer_dataset
        self.lfpw_dataset = lfpw_dataset
        self.fer_length = len(fer_dataset)
        self.lfpw_length = len(lfpw_dataset)
        self.max_length = max(self.fer_length, self.lfpw_length)

    def __len__(self):
        return self.max_length

    def __getitem__(self, idx):
        fer_idx = idx % self.fer_length
        lfpw_idx = idx % self.lfpw_length

        fer_image, fer_label = self.fer_dataset[fer_idx]
        lfpw_image, lfpw_bbox = self.lfpw_dataset[lfpw_idx]

        return fer_image, fer_label, lfpw_image, lfpw_bbox

# 使用示例
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])


import torch
import torch.nn as nn
import timm

import torch
import torch.nn as nn
import timm

class MultiTaskModel(nn.Module):
    def __init__(self, num_classes=8):
        super(MultiTaskModel, self).__init__()
        # 使用预训练的 Swin Transformer
        self.swin = timm.create_model('swin_small_patch4_window7_224', pretrained=True)
        swin_in_features = self.swin.head.in_features * 7 * 7  # Assuming output feature map size is 7x7

        # 替换分类头
        self.swin.head = nn.Identity()

        # Flatten the features
        self.flatten = nn.Flatten()

        # 表情识别分类头
        self.classifier = nn.Linear(swin_in_features, num_classes)
        # 边框回归头
        self.bbox_regressor = nn.Linear(swin_in_features, 4)

    def forward(self, x):
        features = self.swin.forward_features(x)  # Get features from Swin Transformer
        flattened_features = self.flatten(features)

        classification = self.classifier(flattened_features)
        bbox = self.bbox_regressor(flattened_features)
        return classification, bbox

# # Example usage
# model = MultiTaskModel()
# example_input = torch.randn(16, 3, 224, 224)  # Example input shape
# classification, bbox = model(example_input)
# print("Classification shape:", classification.shape)  # Should be [16, 8]
# print("BBox shape:", bbox.shape)  # Should be [16, 4]





from torch.optim.lr_scheduler import CosineAnnealingLR, ReduceLROnPlateau
from torch.cuda.amp import GradScaler, autocast


NameError: name 'Dataset' is not defined

In [None]:

# FER2013Plus 数据集
fer2013plus_root_dir = train_e_dir  # 修改为你的FER2013Plus数据集路径
fer_dataset = FER2013PlusDataset(root_dir=fer2013plus_root_dir, transform=transform_fer['train'])

# LFPW 数据集
lfpw_root_dir = train_lp_dir  # 修改为你的LFPW数据集路径
lfpw_dataset = LFPWDataset(root_dir=lfpw_root_dir, transform=transform)

# 创建多任务数据集
multi_task_dataset = MultiTaskDataset(fer_dataset, lfpw_dataset)

# 创建数据加载器
data_loader = DataLoader(multi_task_dataset, batch_size=16, shuffle=True)


In [None]:
import torch

def normalize_bbox(bboxes, image_size):
    height, width = image_size
    bboxes[:, 0] /= width  # Normalize x1
    bboxes[:, 1] /= height  # Normalize y1
    bboxes[:, 2] /= width  # Normalize x2
    bboxes[:, 3] /= height  # Normalize y2
    return bboxes

def denormalize_bbox(bboxes, image_size):
    height, width = image_size
    bboxes[:, 0] *= width
    bboxes[:, 1] *= height
    bboxes[:, 2] *= width
    bboxes[:, 3] *= height
    return bboxes



In [None]:
from tqdm import tqdm

def train_model(model, dataloader, criterion_classification, criterion_regression, optimizer, scheduler, scaler, num_epochs=10, device='cuda'):
    model = model.to(device)
    best_model_wts = model.state_dict()
    best_acc = 0.0

    for epoch in range(num_epochs):
        print(f'Epoch {epoch}/{num_epochs - 1}')
        print('-' * 10)

        model.train()  # Training mode

        running_loss = 0.0
        running_corrects = 0

        # Iterate over data with a progress bar
        for fer_images, fer_labels, lfpw_images, lfpw_bboxes in tqdm(dataloader, desc=f'Epoch {epoch}/{num_epochs - 1}'):
            fer_images = fer_images.to(device)
            fer_labels = fer_labels.to(device)
            lfpw_images = lfpw_images.to(device)
            lfpw_bboxes = lfpw_bboxes.to(device)

            optimizer.zero_grad()

            with autocast():
                # Expression recognition task
                outputs_classification, _ = model(fer_images)
                loss_classification = criterion_classification(outputs_classification, fer_labels)

                # Bounding box regression task
                _, outputs_regression = model(lfpw_images)
                image_size = lfpw_images.size(2), lfpw_images.size(3)
                normalized_lfpw_bboxes = normalize_bbox(lfpw_bboxes, image_size)

                loss_regression = criterion_regression(outputs_regression, normalized_lfpw_bboxes)

                # Total loss
                loss = loss_classification + loss_regression

            scaler.scale(loss).backward()
            scaler.step(optimizer)
            scaler.update()

            running_loss += loss.item() * fer_images.size(0)
            running_corrects += torch.sum(torch.argmax(outputs_classification, dim=1) == fer_labels.data)

        epoch_loss = running_loss / len(dataloader.dataset)
        epoch_acc = running_corrects.double() / len(dataloader.dataset)

        print(f'Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')

        # Deep copy the model
        if epoch_acc > best_acc:
            best_acc = epoch_acc
            best_model_wts = model.state_dict()

        scheduler.step()

        print()

    print(f'Best Train Acc: {best_acc:.4f}')

    model.load_state_dict(best_model_wts)
    return model

# 训练模型
model = MultiTaskModel(num_classes=8)
criterion_classification = nn.CrossEntropyLoss()
criterion_regression = nn.MSELoss()
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4, weight_decay=1e-2)
# 学习率调度器
scheduler = CosineAnnealingLR(optimizer, T_max=10)

# 使用混合精度训练
scaler = GradScaler()
model = train_model(model, data_loader, criterion_classification, criterion_regression, optimizer, scheduler, scaler, num_epochs=10)


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


model.safetensors:   0%|          | 0.00/200M [00:00<?, ?B/s]

Epoch 0/9
----------


Epoch 0/9: 100%|██████████| 1775/1775 [15:24<00:00,  1.92it/s]


Loss: 0.8578 Acc: 0.7017

Epoch 1/9
----------


Epoch 1/9: 100%|██████████| 1775/1775 [15:21<00:00,  1.93it/s]


Loss: 0.6450 Acc: 0.7744

Epoch 2/9
----------


Epoch 2/9: 100%|██████████| 1775/1775 [15:23<00:00,  1.92it/s]


Loss: 0.5665 Acc: 0.8003

Epoch 3/9
----------


Epoch 3/9: 100%|██████████| 1775/1775 [15:12<00:00,  1.95it/s]


Loss: 0.5057 Acc: 0.8214

Epoch 4/9
----------


Epoch 4/9: 100%|██████████| 1775/1775 [15:13<00:00,  1.94it/s]


Loss: 0.4387 Acc: 0.8441

Epoch 5/9
----------


Epoch 5/9: 100%|██████████| 1775/1775 [15:22<00:00,  1.92it/s]


Loss: 0.3712 Acc: 0.8682

Epoch 6/9
----------


Epoch 6/9: 100%|██████████| 1775/1775 [15:20<00:00,  1.93it/s]


Loss: 0.3129 Acc: 0.8857

Epoch 7/9
----------


Epoch 7/9: 100%|██████████| 1775/1775 [15:41<00:00,  1.88it/s]


Loss: 0.2622 Acc: 0.9050

Epoch 8/9
----------


Epoch 8/9: 100%|██████████| 1775/1775 [15:35<00:00,  1.90it/s]


Loss: 0.2248 Acc: 0.9190

Epoch 9/9
----------


Epoch 9/9: 100%|██████████| 1775/1775 [15:39<00:00,  1.89it/s]

Loss: 0.2001 Acc: 0.9263

Best Train Acc: 0.9263





In [None]:

# FER2013Plus 数据集
fer2013plus_root_dir = test_e_dir  # 修改为你的FER2013Plus数据集路径
fer_dataset = FER2013PlusDataset(root_dir=fer2013plus_root_dir, transform=transform_fer['train'])

# LFPW 数据集
lfpw_root_dir = test_lp_dir  # 修改为你的LFPW数据集路径
lfpw_dataset = LFPWDataset(root_dir=lfpw_root_dir, transform=transform)

# 创建多任务数据集
multi_task_dataset_test = MultiTaskDataset(fer_dataset, lfpw_dataset)

# 创建数据加载器
data_loader_test = DataLoader(multi_task_dataset_test, batch_size=16, shuffle=True)

In [None]:
model.eval()
test_corrects = 0
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
for inputs, labels, lfpw_images, lfpw_bboxes in tqdm(data_loader_test, desc='Testing'):
    inputs = inputs.to(device)
    labels = labels.to(device)
    outputs,_ = model(inputs)
    lfpw_images = lfpw_images.to(device)
    lfpw_bboxes = lfpw_bboxes.to(device)
    _, preds = torch.max(outputs, 1)
    test_corrects += torch.sum(preds == labels.data)



Testing: 100%|██████████| 444/444 [02:31<00:00,  2.92it/s]


In [None]:
test_acc = test_corrects.double() / len(data_loader_test.dataset)
print(f'Test Accuracy: {test_acc:.4f}')

Test Accuracy: 0.8410


In [None]:
torch.save(model.state_dict(), 'swing_trans_multi.pth')

In [None]:
model = MultiTaskModel(num_classes=8)
model.load_state_dict(torch.load('/content/drive/MyDrive/haoyuh3/swing_trans_multi.pth'))
criterion_classification = nn.CrossEntropyLoss()
criterion_regression = nn.MSELoss()
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-8, weight_decay=1e-2)
# 学习率调度器
scheduler = CosineAnnealingLR(optimizer, T_max=10)

# 使用混合精度训练
scaler = GradScaler()
model = train_model(model, data_loader, criterion_classification, criterion_regression, optimizer, scheduler, scaler, num_epochs=5)


NameError: name 'MultiTaskModel' is not defined