In [None]:
import torch
import numpy as np
import pandas as pd
import random
from torchvision import datasets, transforms
import torch.nn as nn
import torch.nn.functional as F
from tqdm import tqdm

In [None]:
SEED = 42
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed_all(SEED)
random.seed(SEED)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

In [None]:
BATCH_SIZE = 128
LEARNING_RATE = 0.1
NUM_EPOCHS = 10
MOMENTUM = 0.9

In [None]:
transform_train = transforms.Compose([
    transforms.RandomCrop(32, padding=4),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize((0.5071, 0.4865, 0.4409), (0.2673, 0.2564, 0.2762)),
])

transform_test = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5071, 0.4865, 0.4409), (0.2673, 0.2564, 0.2762)),
])

trainset = datasets.CIFAR100(root='./data', train=True, download=True, transform=transform_train)
testset = datasets.CIFAR100(root='./data', train=False, download=True, transform=transform_test)

trainloader = torch.utils.data.DataLoader(trainset, batch_size=BATCH_SIZE, shuffle=True, num_workers=2)
testloader = torch.utils.data.DataLoader(testset, batch_size=BATCH_SIZE, shuffle=False, num_workers=2)

classes = trainset.classes

print("Bộ dữ liệu CIFAR100 đã được tải và chuẩn bị.")
print(f"Số lượng mẫu huấn luyện: {len(trainset)}")
print(f"Số lượng mẫu kiểm tra: {len(testset)}")
print(f"Số lượng lớp: {len(classes)}")
# print(f"Tên các lớp: {classes}") # Có thể bỏ comment để in ra tên các lớp


In [None]:
num_tasks = 10
classes_per_task = 10
total_classes = len(classes)

# Đảm bảo tổng số lớp chia đều cho số task
if total_classes % classes_per_task != 0 or total_classes / classes_per_task != num_tasks:
    raise ValueError("Số lượng lớp không chia đều cho số task hoặc số task * lớp mỗi task không bằng tổng số lớp.")

# Tạo danh sách các task, mỗi task chứa các chỉ số lớp
task_classes = [list(range(i * classes_per_task, (i + 1) * classes_per_task)) for i in range(num_tasks)]

# Hàm để tạo subset cho một task cụ thể
def create_task_subset(dataset, classes_in_task):
    indices = []
    for i in range(len(dataset)):
        if dataset.targets[i] in classes_in_task:
            indices.append(i)
    return torch.utils.data.Subset(dataset, indices)

# Tạo danh sách các tập dữ liệu cho từng task
train_datasets = [create_task_subset(trainset, task_classes[i]) for i in range(num_tasks)]
test_datasets = [create_task_subset(testset, task_classes[i]) for i in range(num_tasks)]

# Tạo DataLoader cho từng task (ví dụ cho task 0)
# Bạn sẽ cần lặp qua các task để tạo DataLoader cho từng task khi huấn luyện
# train_loaders = [torch.utils.data.DataLoader(ds, batch_size=128, shuffle=True, num_workers=2) for ds in train_datasets]
# test_loaders = [torch.utils.data.DataLoader(ds, batch_size=100, shuffle=False, num_workers=2) for ds in test_datasets]

print("\nĐã chia bộ dữ liệu thành các task cho Class Incremental Learning.")
print(f"Số lượng task: {num_tasks}")
print(f"Số lớp mỗi task: {classes_per_task}")

# In ra số lượng mẫu trong mỗi task (chỉ để kiểm tra)
for i in range(num_tasks):
    print(f"Task {i}: bao gồm các lớp {task_classes[i]}, số mẫu huấn luyện: {len(train_datasets[i])}, số mẫu kiểm tra: {len(test_datasets[i])}")

# Bây giờ bạn có thể lặp qua `train_datasets` và `test_datasets` để huấn luyện mô hình cho từng task

In [None]:
# Định nghĩa block cơ bản cho ResNet
class BasicBlock(nn.Module):
    expansion = 1

    def __init__(self, in_planes, planes, stride=1):
        super(BasicBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_planes, planes, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(planes)
        self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(planes)

        self.shortcut = nn.Sequential()
        if stride != 1 or in_planes != self.expansion*planes:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_planes, self.expansion*planes, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(self.expansion*planes)
            )

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        out += self.shortcut(x)
        out = F.relu(out)
        return out

# Định nghĩa lớp ResNet
class ResNet(nn.Module):
    def __init__(self, block, num_blocks, num_classes=100):
        super(ResNet, self).__init__()
        self.in_planes = 16

        self.conv1 = nn.Conv2d(3, 16, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(16)
        self.layer1 = self._make_layer(block, 16, num_blocks[0], stride=1)
        self.layer2 = self._make_layer(block, 32, num_blocks[1], stride=2)
        self.layer3 = self._make_layer(block, 64, num_blocks[2], stride=2)
        self.linear = nn.Linear(64*block.expansion, num_classes)

    def _make_layer(self, block, planes, num_blocks, stride):
        strides = [stride] + [1]*(num_blocks-1)
        layers = []
        for stride in strides:
            layers.append(block(self.in_planes, planes, stride))
            self.in_planes = planes * block.expansion
        return nn.Sequential(*layers)

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.layer1(out)
        out = self.layer2(out)
        out = self.layer3(out)
        out = F.avg_pool2d(out, 8)
        out = out.view(out.size(0), -1)
        out = self.linear(out)
        return out

# Device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Sử dụng device: {device}")

# Khởi tạo mô hình
model = ResNet(BasicBlock, [5, 5, 5], num_classes=classes_per_task).to(device)

In [None]:
# Hàm huấn luyện cho một task
def train_task(model, train_loader, criterion, optimizer, epoch, task_id, current_classes):
    model.train()
    for inputs, targets in (train_loader):
        inputs, targets = inputs.to(device), targets.to(device)

        task_targets = torch.tensor([current_classes.index(t.item()) for t in targets], device=device)

        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, task_targets)
        loss.backward()
        optimizer.step()


# Hàm kiểm tra trên tất cả các task đã học
def test_tasks(model, test_datasets, tasks_learned, task_classes):
    model.eval()
    all_total = 0
    all_correct = 0
    task_accuracies = []

    with torch.no_grad():
        for task_id in range(tasks_learned):
            test_loader = torch.utils.data.DataLoader(test_datasets[task_id], batch_size=BATCH_SIZE, shuffle=False, num_workers=2)
            task_total = 0
            task_correct = 0
            current_classes = task_classes[task_id]

            for inputs, targets in test_loader:
                inputs, targets = inputs.to(device), targets.to(device)

                # Chuyển đổi nhãn gốc sang nhãn tương ứng trong task hiện tại
                task_targets = torch.tensor([current_classes.index(t.item()) for t in targets], device=device)


                outputs = model(inputs)
                _, predicted = outputs.max(1)

                task_total += task_targets.size(0)
                task_correct += predicted.eq(task_targets).sum().item()

            task_acc = 100. * task_correct / task_total if task_total > 0 else 0
            task_accuracies.append(task_acc)
            all_total += task_total
            all_correct += task_correct
            print(f'Test on Task {task_id + 1}: Acc: {task_acc:.2f}%')

    overall_acc = 100. * all_correct / all_total if all_total > 0 else 0
    print(f'Overall Test Accuracy on Learned Tasks ({tasks_learned}/{num_tasks}): {overall_acc:.2f}%')
    return task_accuracies, overall_acc


# Lưu trữ kết quả
results = {
    'task_ids': [],
    'task_accuracies': [],
    'overall_accuracy': []
}

# Huấn luyện tuần tự trên từng task
for task_id in range(num_tasks):
    print(f"\n====== Training Task {task_id + 1}/{num_tasks} (Classes: {task_classes[task_id]}) ======")
    # in thông tin của lớp đầu ra
    print(f"Output layer size: {model.linear.out_features}")
    # Tạo DataLoader cho task hiện tại
    current_train_dataset = train_datasets[task_id]
    current_train_loader = torch.utils.data.DataLoader(current_train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=2)

    # Khởi tạo optimizer và criterion
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.SGD(model.parameters(), lr=LEARNING_RATE, momentum=MOMENTUM)
    scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, milestones=[80, 120], gamma=0.1)

    # Huấn luyện mô hình trên task hiện tại
    for epoch in tqdm(range(NUM_EPOCHS)):
        train_task(model, current_train_loader, criterion, optimizer, epoch, task_id, task_classes[task_id])
        scheduler.step()

    # Kiểm tra mô hình trên tất cả các task đã học
    print(f"\nEvaluating after training Task {task_id + 1}:")
    current_task_accuracies, current_overall_accuracy = test_tasks(model, test_datasets, task_id + 1, task_classes)

    # Lưu kết quả
    results['task_ids'].append(task_id + 1)
    results['task_accuracies'].append(current_task_accuracies)
    results['overall_accuracy'].append(current_overall_accuracy)

    # Thêm nhãn của task hiện tại vào mô hình cho task sau
    if task_id < num_tasks - 1:
        # Thay đổi kích thước lớp đầu ra để bao gồm các lớp mới
        model.linear = nn.Linear(model.linear.in_features, model.linear.out_features + len(task_classes[task_id + 1])).to(device)

# In ra kết quả cuối cùng
print("\n====== Incremental Learning Results ======")
for i in range(num_tasks):
    print(f"After Task {results['task_ids'][i]}:")
    for j in range(results['task_ids'][i]):
        print(f"  Accuracy on Task {j+1}: {results['task_accuracies'][i][j]:.2f}%")
    print(f"  Overall Accuracy on Learned Tasks: {results['overall_accuracy'][i]:.2f}%")
    print("-" * 20)

In [None]:
# (Tùy chọn) Lưu kết quả vào file CSV hoặc DataFrame
# results_df = pd.DataFrame(results)
# print("\nKết quả dưới dạng DataFrame:")
# print(results_df)
# results_df.to_csv('cil_results.csv', index=False)
# print("Kết quả đã được lưu vào cil_results.csv")