In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
!pip install torch torchvision torchsummary numpy matplotlib

Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_cupti_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cudnn-cu12==9.1.0.70 (from torch)
  Downloading nvidia_cudnn_cu12-9.1.0.70-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cublas-cu12==12.4.5.8 (from torch)
  Downloading nvidia_cublas_cu12-12.4.5.8-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cufft-cu12==11.2.1.3 (from torch)
  Downloading nvidia_cufft_cu12-11.2.1.3-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-curand-cu12==10.3.5.147 (from torch)
  Downloading nvidia_curand_cu12-10.3.5

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchvision
from torchvision.transforms import AutoAugment, AutoAugmentPolicy
import torchvision.transforms as transforms
import numpy as np
from torch.utils.data import TensorDataset, DataLoader, Subset
from torchsummary import summary
import matplotlib.pyplot as plt
import os
from torch.utils.data import Dataset, DataLoader

In [None]:
file_map = [
    {
        "train": "Model1/model1_train.pth",
        "test": "Model1/model1_test.pth",
        "classes": {0: 0, 40: 1, 10: 2, 20: 3, 30: 4}
    },
    {
        "train": "Model2/model2_train.pth",
        "test": "Model2/model2_test.pth",
        "classes": {1: 0, 41: 1, 11: 2, 21: 3, 31: 4}
    },
    {
        "train": "Model3/model3_train.pth",
        "test": "Model3/model3_test.pth",
        "classes": {32: 0, 2: 1, 42: 2, 12: 3, 22: 4}
    }
]
model_folder = './drive/MyDrive/TaskA/Task1_data'

In [None]:
task2_file_map = [
    {
        "train": "train_dataB_model_1.pth",
        "test": "val_dataB_model_1.pth",
        "classes": {34: 0, 137: 1, 159: 2, 173: 3, 201: 4}
    },
    {
        "train": "train_dataB_model_2.pth",
        "test": "val_dataB_model_2.pth",
        "classes": {24: 0, 34: 1, 80: 2, 135: 3, 202: 4}
    },
    {
        "train": "train_dataB_model_3.pth",
        "test": "val_dataB_model_3.pth",
        "classes": {124: 0, 125: 1, 130: 2, 173: 3, 202: 4}
    }
]
model_folder = './drive/MyDrive/Task2_data'

In [None]:
def display_images(data, labels):
    unique_labels = np.unique(labels)
    num_classes = len(unique_labels)
    fig, axes = plt.subplots(1, num_classes, figsize=(15, 5))
    for i, label in enumerate(unique_labels):
        idx = np.where(labels == label)[0][0]
        image = data[idx]
        axes[i].imshow(image.astype(np.uint8))
        axes[i].set_title(f'Label: {label}')
        axes[i].axis('off')
    plt.show()

def load_pth(file_path):
    raw_data = torch.load(file_path)
    return raw_data['data'].numpy(), raw_data['labels'].numpy()

test_train_data, test_train_labels = load_pth('./drive/MyDrive/Task2_data/train_dataB_model_1.pth')
# test_train_data, test_train_labels = load_pth('./drive/MyDrive/TaskA/Task1_data/Model1/model1_train.pth')
print('Displaying images for all classes:')
display_images(test_train_data, test_train_labels)

In [None]:
train_transform = transforms.Compose([
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(15),
    transforms.RandomCrop(32, padding=4),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.02),
    # AutoAugment(AutoAugmentPolicy.CIFAR10),  # 自动增强
    # transforms.ColorJitter(
    #     brightness=0.2,
    #     contrast=0.2,
    #     saturation=0.2,
    #     hue=0.1
    # ),
    # transforms.RandomAffine(degrees=0, translate=(0.1, 0.1)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5071, 0.4867, 0.4408], std=[0.2675, 0.2565, 0.2761])
  ])

test_transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5071, 0.4867, 0.4408], std=[0.2675, 0.2565, 0.2761])
])

In [None]:
class CustomDataset(Dataset):
    def __init__(self, data, labels, transform=None):
        self.data = data
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        image = self.data[idx]
        label = self.labels[idx]

        if isinstance(image, torch.Tensor):
            # Check tensor shape and permute if necessary
            if image.dim() == 3:  # If shape is [C, H, W]
                if image.size(0) > 4:  # If first dimension > 4, it's likely [H, W, C]
                    image = image.permute(2, 0, 1)  # Convert to [C, H, W]
            elif image.dim() == 2:  # If grayscale
                image = image.unsqueeze(0)  # Add channel dimension

            # Ensure we have 3 channels for RGB
            if image.size(0) != 3:
                # If it's a different number of channels, reshape to 3 channels
                image = image.view(3, 32, 32)

            image = transforms.functional.to_pil_image(image)

        if self.transform:
            image = self.transform(image)

        return image, label

In [None]:
def create_dataloaders(file,batch_size=32):
    train_path = os.path.join(model_folder, file["train"])
    test_path = os.path.join(model_folder, file["test"])

    # load data
    train_data = torch.load(train_path)
    data = train_data['data']
    labels = train_data['labels']
    if isinstance(labels[0], torch.Tensor):
      train_labels_tensor = torch.tensor([file['classes'][label.item()] for label in labels], dtype=torch.long)
    else:
      train_labels_tensor = torch.tensor([file['classes'][label] for label in labels], dtype=torch.long)
    # train_labels_tensor = torch.tensor([file['classes'][label] for label in labels], dtype=torch.long)
    train_dataset = CustomDataset(data, train_labels_tensor, transform=train_transform)

    test_data = torch.load(test_path)
    data = test_data['data']
    labels = test_data['labels']
    if isinstance(labels[0], torch.Tensor):
      test_labels_tensor = torch.tensor([file['classes'][label.item()] for label in labels], dtype=torch.long)
    else:
      test_labels_tensor = torch.tensor([file['classes'][label] for label in labels], dtype=torch.long)
    # test_labels_tensor = torch.tensor([file['classes'][label] for label in labels], dtype=torch.long)
    test_dataset = CustomDataset(data, test_labels_tensor, transform=train_transform)

    # generate data loader
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

    return train_loader, test_loader

In [None]:
class CNN(nn.Module):
  def __init__(self, num_classes=5, dropout_rate=0.2):
      super(CNN, self).__init__()

      self.block1 = nn.Sequential(
          nn.Conv2d(3, 64, kernel_size=3, padding=1),
          nn.BatchNorm2d(64),
          nn.ReLU(inplace=True),
          nn.Conv2d(64, 64, kernel_size=3, padding=1),
          nn.BatchNorm2d(64),
          nn.ReLU(inplace=True),
          nn.MaxPool2d(2),
          nn.Dropout2d(0.2)
      )

      self.block2 = nn.Sequential(
          nn.Conv2d(64, 128, kernel_size=3, padding=1),
          nn.BatchNorm2d(128),
          nn.ReLU(inplace=True),
          nn.Conv2d(128, 128, kernel_size=3, padding=1),
          nn.BatchNorm2d(128),
          nn.ReLU(inplace=True),
          nn.MaxPool2d(2),
          nn.Dropout2d(0.2)
      )

      self.block3 = nn.Sequential(
          nn.Conv2d(128, 256, kernel_size=3, padding=1),
          nn.BatchNorm2d(256),
          nn.ReLU(inplace=True),
          nn.MaxPool2d(2),
          nn.Dropout2d(0.2)
      )

      self.classifier = nn.Sequential(
          nn.Linear(256 * 4 * 4, 128),
          nn.ReLU(inplace=True),
          nn.Dropout(dropout_rate),
          nn.Linear(128, num_classes)
      )
      # Initialize weights properly
      self._initialize_weights()

  def _initialize_weights(self):
    for m in self.modules():
      if isinstance(m, nn.Conv2d):
        nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
      elif isinstance(m, nn.BatchNorm2d):
        nn.init.constant_(m.weight, 1)
        nn.init.constant_(m.bias, 0)
      elif isinstance(m, nn.Linear):
        nn.init.normal_(m.weight, 0, 0.01)
        nn.init.constant_(m.bias, 0)

  # connect layers
  def forward(self, x):
      x = self.block1(x)
      x = self.block2(x)
      x = self.block3(x)
      x = x.view(x.size(0), -1)
      x = self.classifier(x)
      # x = F.log_softmax(self.fc3(x), dim=1)
      return x

In [None]:
def main():
  device = torch.device("cpu")
  if torch.cuda.is_available():
    device = torch.device("cuda")
  elif torch.backends.mps.is_built() and torch.backends.mps.is_available():
    device = torch.device("mps")
  print(device)

  batch_size = 32
  learning_rate = 0.001
  weight_decay = 0.0001

  #testA: file_map
  #testB: task2_file_map
  for i, file in enumerate(task2_file_map):
    net = CNN(num_classes=5).to(device)
    train_loader, test_loader = create_dataloaders(file)
     # use cross entropy loss and adam optimizer
    loss_function = nn.CrossEntropyLoss(label_smoothing=0.1)
    optimizer = optim.Adam(net.parameters(), lr=learning_rate, weight_decay=weight_decay)

    # training
    epochs = 100

    for epoch in range(epochs):
      running_loss = 0.0
      loss_history = []
      net.to(device)

      for inputs, labels in train_loader:
        inputs = inputs.to(device)
        labels = labels.to(device)
        optimizer.zero_grad()
        outputs = net(inputs)
        loss = loss_function(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item() * inputs.size(0)
      epoch_loss = running_loss / len(train_loader.dataset)
      loss_history.append(epoch_loss)
      print(f"Epoch {epoch + 1}/{epochs} Loss: {epoch_loss:.4f}")


    print('Finished Training')

    net.eval()
    correct = 0
    total = 0
    with torch.no_grad():
      for inputs, labels in test_loader:
        inputs = inputs.to(device)
        labels = labels.to(device)
        outputs = net(inputs)
        _, preds = torch.max(outputs, 1)
        correct += (preds == labels).sum().item()
        total += labels.size(0)
    accuracy = correct / total
    print(f"Test Accuracy: {accuracy * 100:.2f}%")

if __name__ == "__main__":
  main()

cuda


  train_data = torch.load(train_path)
  test_data = torch.load(test_path)


Epoch 1/100 Loss: 1.5376
Epoch 2/100 Loss: 1.5116
Epoch 3/100 Loss: 1.5150
Epoch 4/100 Loss: 1.5076
Epoch 5/100 Loss: 1.4931
Epoch 6/100 Loss: 1.4893
Epoch 7/100 Loss: 1.4910
Epoch 8/100 Loss: 1.4726
Epoch 9/100 Loss: 1.4650
Epoch 10/100 Loss: 1.4607
Epoch 11/100 Loss: 1.4581
Epoch 12/100 Loss: 1.4482
Epoch 13/100 Loss: 1.4452
Epoch 14/100 Loss: 1.4257
Epoch 15/100 Loss: 1.4259
Epoch 16/100 Loss: 1.4265
Epoch 17/100 Loss: 1.4324
Epoch 18/100 Loss: 1.4150
Epoch 19/100 Loss: 1.4214
Epoch 20/100 Loss: 1.4263
Epoch 21/100 Loss: 1.4248
Epoch 22/100 Loss: 1.4148
Epoch 23/100 Loss: 1.3990
Epoch 24/100 Loss: 1.3922
Epoch 25/100 Loss: 1.4065
Epoch 26/100 Loss: 1.4151
Epoch 27/100 Loss: 1.3985
Epoch 28/100 Loss: 1.3930
Epoch 29/100 Loss: 1.3902
Epoch 30/100 Loss: 1.3848
Epoch 31/100 Loss: 1.3874
Epoch 32/100 Loss: 1.3786
Epoch 33/100 Loss: 1.3850
Epoch 34/100 Loss: 1.3863
Epoch 35/100 Loss: 1.3567
Epoch 36/100 Loss: 1.3738
Epoch 37/100 Loss: 1.3556
Epoch 38/100 Loss: 1.3694
Epoch 39/100 Loss: 1.

In [None]:
device = torch.device("cpu")
if torch.cuda.is_available():
  device = torch.device("cuda")
elif torch.backends.mps.is_built() and torch.backends.mps.is_available():
  device = torch.device("mps")
print(device)

class EnsembleModel(nn.Module):
    def __init__(self, num_models=3, num_classes=5):
        super(EnsembleModel, self).__init__()
        self.models = nn.ModuleList([
            CNN(num_classes=num_classes) for _ in range(num_models)
        ])

    def forward(self, x, model_idx):
        return self.models[model_idx](x)

def create_weighted_loss(file_map):
    # Calculate weights based on label frequency
    label_counts = {}
    for file in file_map:
        for label in file['classes'].values():
            label_counts[label] = label_counts.get(label, 0) + 1

    weights = torch.ones(5)
    for label, count in label_counts.items():
        if count > 1:
            weights[label] = 1.0 / count

    return nn.CrossEntropyLoss(weight=weights.to(device), label_smoothing=0.1)

def train_ensemble_model(ensemble, train_loaders, test_loaders, epochs=100):
    criterion = create_weighted_loss(task2_file_map)
    optimizers = [
        optim.AdamW(model.parameters(), lr=0.001, weight_decay=0.01)
        for model in ensemble.models
    ]
    schedulers = [
        optim.lr_scheduler.CosineAnnealingWarmRestarts(
            optimizer, T_0=10, T_mult=2, eta_min=1e-6
        )
        for optimizer in optimizers
    ]

    best_accuracies = [0.0] * len(ensemble.models)

    for epoch in range(epochs):
        # Training phase
        ensemble.train()
        for model_idx in range(len(ensemble.models)):
            running_loss = 0.0
            correct = 0
            total = 0

            for inputs, labels in train_loaders[model_idx]:
                inputs, labels = inputs.to(device), labels.to(device)

                optimizers[model_idx].zero_grad()
                outputs = ensemble(inputs, model_idx)
                loss = criterion(outputs, labels)
                loss.backward()

                # Gradient clipping
                torch.nn.utils.clip_grad_norm_(ensemble.models[model_idx].parameters(), 1.0)

                optimizers[model_idx].step()
                schedulers[model_idx].step()

                running_loss += loss.item() * inputs.size(0)
                _, predicted = outputs.max(1)
                total += labels.size(0)
                correct += predicted.eq(labels).sum().item()

            epoch_loss = running_loss / len(train_loaders[model_idx].dataset)
            epoch_acc = 100. * correct / total

            # Validation phase
            val_acc = evaluate_model(
                ensemble, model_idx, test_loaders[model_idx], criterion, device
            )

            print(f'Model {model_idx + 1} - Epoch {epoch + 1}/{epochs} - Loss: {epoch_loss:.4f}')

    return ensemble

def evaluate_model(ensemble, model_idx, loader, criterion, device):
    ensemble.eval()
    correct = 0
    total = 0

    with torch.no_grad():
        for inputs, labels in loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = ensemble(inputs, model_idx)
            _, predicted = outputs.max(1)
            total += labels.size(0)
            correct += predicted.eq(labels).sum().item()

    return 100. * correct / total

def main():
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"Using device: {device}")

    batch_size = 128
    ensemble = EnsembleModel().to(device)

    # Create dataloaders for each model
    train_loaders = []
    test_loaders = []
    for file in task2_file_map:
        train_loader, test_loader = create_dataloaders(file, batch_size=batch_size)
        train_loaders.append(train_loader)
        test_loaders.append(test_loader)

    # Train ensemble
    ensemble = train_ensemble_model(
        ensemble, train_loaders, test_loaders, epochs=100
    )

    # Final evaluation
    print("\nFinal Test Accuracies:")
    total_acc = 0
    for i in range(len(ensemble.models)):
        acc = evaluate_model(ensemble, i, test_loaders[i], None, device)
        total_acc += acc
        print(f"Model {i + 1}: {acc:.2f}%")

    avg_acc = total_acc / len(ensemble.models)
    print("="*50)
    print(f"Average Model Accuracy: {avg_acc:.2f}%")
    print("="*50)

if __name__ == "__main__":
    main()

cuda
Using device: cuda


  train_data = torch.load(train_path)
  test_data = torch.load(test_path)


Model 1 - Epoch 1/100 - Loss: 1.5599
Model 2 - Epoch 1/100 - Loss: 1.5921
Model 3 - Epoch 1/100 - Loss: 1.6018
Model 1 - Epoch 2/100 - Loss: 1.5128
Model 2 - Epoch 2/100 - Loss: 1.5899
Model 3 - Epoch 2/100 - Loss: 1.5908
Model 1 - Epoch 3/100 - Loss: 1.4869
Model 2 - Epoch 3/100 - Loss: 1.5374
Model 3 - Epoch 3/100 - Loss: 1.5873
Model 1 - Epoch 4/100 - Loss: 1.4878
Model 2 - Epoch 4/100 - Loss: 1.5135
Model 3 - Epoch 4/100 - Loss: 1.5525
Model 1 - Epoch 5/100 - Loss: 1.4690
Model 2 - Epoch 5/100 - Loss: 1.4929
Model 3 - Epoch 5/100 - Loss: 1.5321
Model 1 - Epoch 6/100 - Loss: 1.4556
Model 2 - Epoch 6/100 - Loss: 1.4910
Model 3 - Epoch 6/100 - Loss: 1.6139
Model 1 - Epoch 7/100 - Loss: 1.4473
Model 2 - Epoch 7/100 - Loss: 1.5905
Model 3 - Epoch 7/100 - Loss: 1.5411
Model 1 - Epoch 8/100 - Loss: 1.4466
Model 2 - Epoch 8/100 - Loss: 1.4964
Model 3 - Epoch 8/100 - Loss: 1.4910
Model 1 - Epoch 9/100 - Loss: 1.4336
Model 2 - Epoch 9/100 - Loss: 1.4745
Model 3 - Epoch 9/100 - Loss: 1.4866
M