# **Preparation**

In [9]:
# Set up required version
!pip install -r Requirements

Looking in indexes: https://pypi.org/simple, https://download.pytorch.org/whl/cu121


In [10]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader, Subset, random_split, ConcatDataset
import torchvision
from torchvision.transforms import transforms
import numpy as np
import matplotlib.pylab as plt
from tqdm.auto import tqdm
import pandas as pd
import seaborn as sns
from sklearn.model_selection import train_test_split
import os

In [11]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'

# **Neccessary Methods**

In [8]:
# Neccessary methods
def load_all_data(data_dir):
  """
  Only create train_loader
  """

  default_transforms = transforms.Compose([
      transforms.PILToTensor(),
      transforms.ConvertImageDtype(torch.float32),
      transforms.Resize((224, 224)),
      transforms.Normalize(mean=[0.439673, 0.394798, 0.360261], std=[0.184119, 0.177669, 0.170195])
  ])

  manual_transform = transforms.Compose([
      transforms.PILToTensor(),
      transforms.ConvertImageDtype(torch.float32),
      transforms.RandomHorizontalFlip(),
      transforms.RandomVerticalFlip(),
      transforms.RandomResizedCrop(size=(224, 224), scale=(0.8, 1)),
      transforms.ColorJitter(brightness=0.25, contrast=0.25, saturation=0.25, hue=0.25),
      transforms.GaussianBlur(kernel_size=3),
      transforms.Lambda(lambda x: x + torch.randn_like(x) * 0.01),
      transforms.Normalize(mean=[0.439673, 0.394798, 0.360261], std=[0.184119, 0.177669, 0.170195]),

])

  original_dataset = torchvision.datasets.ImageFolder(root=os.path.join(data_dir, 'train'),
                                        transform=default_transforms,
                                        target_transform=None)

  augmented_dataset = torchvision.datasets.ImageFolder(root=os.path.join(data_dir, 'train'),
                                        transform=manual_transform,
                                        target_transform=None)

  train_loader = DataLoader(ConcatDataset([original_dataset, augmented_dataset]),
                            batch_size=16,
                            shuffle=True)

  return train_loader
def load_data(data_dir):
  """
  Create train loader and valid loader
  """

  os.makedirs(name="aio_hutech/train")
  manual_transform = transforms.Compose([
      transforms.PILToTensor(),
      transforms.ConvertImageDtype(torch.float32),
      transforms.RandomHorizontalFlip(),
      transforms.RandomVerticalFlip(),
      transforms.Resize(size=(224, 224)),
      transforms.ColorJitter(brightness=0.25, contrast=0.25),
      transforms.Normalize(mean=[0.439673, 0.394798, 0.360261], std=[0.184119, 0.177669, 0.170195]),
      transforms.Lambda(lambda x: x + torch.randn_like(x) * 0.02),

])

  dataset = torchvision.datasets.ImageFolder(root=os.path.join(data_dir, 'train'),
                                        transform=manual_transform,
                                        target_transform=None)
  # Get indices and labels
  indices = list(range(len(dataset)))
  labels = [label for _, label in dataset.samples]

  # Perform stratified split
  train_idx, test_idx = train_test_split(indices, test_size=0.1, stratify=labels, random_state=42)

  # Create Subset datasets
  train_ds = Subset(dataset, train_idx)
  val_ds = Subset(dataset, test_idx)

  train_loader = DataLoader(dataset=train_ds,
                            batch_size=16,
                            shuffle=True)
  val_loader = DataLoader(dataset=val_ds,
                          batch_size=16,
                          shuffle=False)

  return train_loader, val_loader

def create_effnetb6():
  weights = torchvision.models.EfficientNet_B6_Weights.IMAGENET1K_V1

  model = torchvision.models.efficientnet_b6(weights=weights)

  # for param in model.parameters():
  #   param.requires_grad_(False)

  model.classifier = nn.Sequential(
      nn.Linear(in_features=2304, out_features=1024),
      nn.Dropout(p=0.3),
      nn.BatchNorm1d(num_features=1024),
      nn.ReLU(),
      nn.Linear(in_features=1024, out_features=512),
      nn.Dropout(p=0.3),
      nn.BatchNorm1d(num_features=512),
      nn.ReLU(),
      nn.Linear(in_features=512, out_features=4)
  )

  return model

  return model
def train(model : nn.Module,
          train_loader : DataLoader,
          val_loader : DataLoader,
          loss_fn : nn,
          optimizer : torch.optim,
          num_epochs,
          device=device):

  model.to(device)
  train_losses, val_losses = [], []
  train_accs, val_accs = [], []
  scheduler = torch.optim.lr_scheduler.ExponentialLR(optimizer, gamma=0.95)

  for epoch in tqdm(range(num_epochs)):
    model.train()
    train_loss, train_acc = 0, 0
    for i, (X, y) in enumerate(train_loader):
      X, y = X.to(device), y.to(device)

      y_logits = model(X)
      loss = loss_fn(y_logits, y)

      train_loss += loss.item()
      train_acc += (torch.argmax(torch.softmax(y_logits, dim=-1), dim=-1) == y).sum().item()

      optimizer.zero_grad()
      loss.backward()
      optimizer.step()

    scheduler.step()

    with torch.inference_mode():
      model.eval()
      val_loss, val_acc = 0, 0

      for i, (X, y) in enumerate(val_loader):
        X, y = X.to(device), y.to(device)

        y_logits = model(X)
        loss = loss_fn(y_logits, y)

        val_loss += loss.item()
        val_acc += (torch.argmax(torch.softmax(y_logits, dim=-1), dim=-1) == y).sum().item()


    train_losses.append(train_loss / len(train_loader))
    train_accs.append(train_acc / len(train_loader.dataset))
    val_losses.append(val_loss / len(val_loader))
    val_accs.append(val_acc / len(val_loader.dataset))

  return {'train_loss' : train_losses,
          'train_acc' : train_accs,
          'val_loss': val_losses,
          'val_acc' : val_accs}

def train_all(model : nn.Module,
          train_loader : DataLoader,
          loss_fn : nn,
          optimizer : torch.optim,
          num_epochs,
          device=device):

  model.to(device)
  train_losses, train_accs = [], []
  scheduler = torch.optim.lr_scheduler.ExponentialLR(optimizer, gamma=0.95)

  for epoch in tqdm(range(num_epochs)):
    model.train()
    train_loss, train_acc = 0, 0
    for i, (X, y) in enumerate(train_loader):
      X, y = X.to(device), y.to(device)

      y_logits = model(X)
      loss = loss_fn(y_logits, y)

      train_loss += loss.item()
      train_acc += (torch.argmax(torch.softmax(y_logits, dim=-1), dim=-1) == y).sum().item()

      optimizer.zero_grad()
      loss.backward()
      optimizer.step()

    scheduler.step()


    train_losses.append(train_loss / len(train_loader))
    train_accs.append(train_acc / len(train_loader.dataset))

  return {'train_loss' : train_losses,
          'train_acc' : train_accs,
          'val_loss': [],
          'val_acc' : []}

def plot_results(result, model_name):
  fig, axes = plt.subplots(1, 2, figsize=(12, 5))  # Create two subplots

  # Extract values
  train_losses = result['train_loss']
  val_losses = result['val_loss']
  train_accs = result['train_acc']
  val_accs = result['val_acc']
  epochs = range(1, len(train_losses) + 1)

  # Plot Loss
  axes[0].plot(epochs, train_losses, label='Train Loss', marker='o')
  axes[0].plot(epochs, val_losses, label='Validation Loss', marker='o')
  axes[0].set_title('Loss Curve')
  axes[0].set_xlabel('Epochs')
  axes[0].set_ylabel('Loss')
  axes[0].legend()
  axes[0].grid(True)

  # Plot Accuracy
  axes[1].plot(epochs, train_accs, label='Train Accuracy', marker='o')
  axes[1].plot(epochs, val_accs, label='Validation Accuracy', marker='o')
  axes[1].set_title('Accuracy Curve')
  axes[1].set_xlabel('Epochs')
  axes[1].set_ylabel('Accuracy')
  axes[1].legend()
  axes[1].grid(True)

  plt.tight_layout()
  plt.show()

def predict(model, run_name):
  """"
    Predict and log
  """
  ID = os.listdir("data/test")
  ID = sorted([id.split('.')[0] for id in ID])

  manual_transforms = transforms.Compose([
                          transforms.ConvertImageDtype(torch.float32),
                          transforms.Resize((224, 224)),
                          transforms.Normalize(mean=[0.439673, 0.394798, 0.360261], std=[0.184119, 0.177669, 0.170195]),
                      ])
  preds = []
  model.eval()
  for i, id in enumerate(ID):
    img_path = os.path.join("data/test/", f"{id}.jpg")

    img = manual_transforms(torchvision.io.read_image(path=img_path))
    img = img.to(device)
    y_pred = torch.argmax(torch.softmax(model(img.unsqueeze(0)).to('cpu'), dim=-1), dim=-1).item()
    preds.append(y_pred)

  class_mapping = {
      0 : 1,
      1 : 3,
      2 : 0,
      3 : 2
  }
  preds = [class_mapping[label] for label in preds]

  test_labels = torch.tensor([[1]*50, [2]*50, [3]*50, [0]*50]).reshape(-1)
  test_acc = (torch.tensor(preds) == test_labels).sum().item() / 200

  df = pd.DataFrame(data = {
    'id' : ID,
    'type' : preds
  })
  df.set_index('id', inplace=True)

  df.to_csv("aio-hutech/submission.csv")

# **Training**

# **Testing**