In [6]:
import os
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, Subset
import torchvision.transforms as transforms
from torchvision import datasets, transforms
import numpy as np

In [7]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [8]:
data_root = "/content/drive/MyDrive/Graduate_DeepL/cinic-10"
print("Root contents: ", os.listdir(data_root))
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

Root contents:  ['valid', 'train', 'test']


In [9]:
transform = transforms.Compose([
    # Resize the input
    transforms.Resize((32, 32)),
    # Scale the data to [0.0, 1.0]
    transforms.ToTensor(),
    # Shift and scale the pixel values to [-1.0, 1.0]
    transforms.Normalize([0.47889522, 0.47227842, 0.43047404], [0.24205776, 0.23828046, 0.25874835]),


])

#
# train_ds = datasets.ImageFolder(os.path.join(data_root, "train"), transform=transform)
# val_ds   = datasets.ImageFolder(os.path.join(data_root, "valid"), transform=transform)
# test_ds  = datasets.ImageFolder(os.path.join(data_root, "test"),  transform=transform)


In [10]:
full_train_ds = datasets.ImageFolder(os.path.join(data_root, "train"), transform=transform)
full_val_ds   = datasets.ImageFolder(os.path.join(data_root, "valid"), transform=transform)
full_test_ds  = datasets.ImageFolder(os.path.join(data_root, "test"),  transform=transform)


In [11]:
def get_subset(dataset, fraction=0.1):
    # Use dataset.targets if available (much faster than indexing each sample)
    if hasattr(dataset, "targets"):
        print("yes")
        targets = np.array(dataset.targets)
    else:
        targets = np.array([dataset[i][1] for i in range(len(dataset))])  # fallback

    classes = np.unique(targets)
    indices = []

    for cls in classes:
        cls_indices = np.where(targets == cls)[0]
        np.random.shuffle(cls_indices)
        take = int(len(cls_indices) * fraction)
        indices.extend(cls_indices[:take])

    np.random.shuffle(indices)  # shuffle across classes
    return Subset(dataset, indices)


In [12]:

train_ds = get_subset(full_train_ds, fraction=0.25)
test_ds  = get_subset(full_test_ds, fraction=0.20)
val_ds  = get_subset(full_val_ds, fraction=0.20)




yes
yes
yes


In [13]:
train_loader = DataLoader(train_ds, batch_size=128, shuffle=True, num_workers=2, pin_memory=True)
val_loader   = DataLoader(val_ds, batch_size=128, shuffle=False, num_workers=2, pin_memory=True)
test_loader  = DataLoader(test_ds, batch_size=128, shuffle=False, num_workers=2, pin_memory=True)

In [14]:
num_classes = len(full_train_ds.classes)
print(num_classes)

10


In [15]:

class CNN(nn.Module):
    def __init__(self, num_classes=10):
        super(CNN, self).__init__()
        self.features = nn.Sequential(
            # Conv Layer 1
            nn.Conv2d(3, 32, kernel_size=3, stride=1, padding=1),  # input: [B, 3, 32, 32]
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),  # [B, 32, 16, 16]

            # Conv Layer 2
            nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1), # [B, 64, 16, 16]
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),  # [B, 64, 8, 8]

            # Conv Layer 3
            nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1), # [B, 128, 8, 8]
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2)   # [B, 128, 4, 4]
        )

        self.classifier = nn.Sequential(
            nn.Flatten(),                           # [B, 128*4*4] = [B, 2048]
            nn.Linear(128*4*4, 256),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(256, num_classes)
        )

    def forward(self, x):
        x = self.features(x)
        x = self.classifier(x)
        return x


In [16]:
model = CNN(num_classes=num_classes).to(device)


In [17]:
# -----------------------
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4, weight_decay=1e-4)

In [18]:

def train_epoch(model, loader):
    model.train()
    running_loss, running_corrects, total = 0, 0, 0
    for x, y in loader:
    # for x, y in tqdm(loader, desc="train", leave=False):
      x, y = x.to(device), y.to(device)
      optimizer.zero_grad()
      out = model(x)
      loss = criterion(out, y)
      loss.backward()
      optimizer.step()
      running_loss += loss.item() * x.size(0)
      _, preds = torch.max(out, 1)
      running_corrects += (preds == y).sum().item()
      total += x.size(0)
    return running_loss/total, running_corrects/total

In [19]:
def eval_epoch(model, loader):
    model.eval()
    running_loss, running_corrects, total = 0, 0, 0
    with torch.no_grad():
      for x, y in loader:
      # for x, y in tqdm(loader, desc="eval", leave=False)
          x, y = x.to(device), y.to(device)
          out = model(x)
          loss = criterion(out, y)
          running_loss += loss.item() * x.size(0)
          _, preds = torch.max(out, 1)
          running_corrects += (preds == y).sum().item()
          total += x.size(0)
    return running_loss/total, running_corrects/total



In [None]:
epochs = 10

train_losses, val_losses = [], []
train_accs, val_accs = [], []


for epoch in range(epochs):
    print("working")
    train_loss, train_acc = train_epoch(model, train_loader)
    val_loss, val_acc = eval_epoch(model, val_loader)
    print(f"Epoch {epoch+1}/{epochs}: train_loss={train_loss:.4f}, train_acc={train_acc:.4f}, val_loss={val_loss:.4f}, val_acc={val_acc:.4f}")

    train_losses.append(train_loss)
    val_losses.append(val_loss)
    train_accs.append(train_acc)
    val_accs.append(val_acc)


test_loss, test_acc = eval_epoch(model, test_loader)
print("Test accuracy:", test_acc)

working
