In [1]:
%%capture
!unzip "flowers.zip" -d "data/"

In [2]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, ConcatDataset
import torchvision
import torchvision.transforms as transforms
from typing import Dict, List, Tuple
import copy
import numpy as np
from torchvision.models import efficientnet_v2_l, efficientnet_v2_s
from torchvision.models import EfficientNet_V2_L_Weights, EfficientNet_V2_S_Weights

In [3]:
def create_base_transforms():
    """base conversion"""
    return transforms.Compose([
        transforms.Resize((320, 240)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406],
                           std=[0.229, 0.224, 0.225])
    ])


def create_augmented_transforms(strength: int = 0):
    """Creating Enhanced Conversions"""
    return transforms.Compose([
        transforms.Resize((320, 240)),
        transforms.RandomHorizontalFlip(p=0.5),
        transforms.RandomRotation(20 + strength * 5),
        transforms.RandomAffine(
            degrees=0,
            translate=(0.1 + strength * 0.05, 0.1 + strength * 0.05),
            scale=(0.9, 1.1)
        ),
        transforms.ColorJitter(
            brightness=0.2 + strength * 0.1,
            contrast=0.2 + strength * 0.1,
            saturation=0.2 + strength * 0.1
        ),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406],
                           std=[0.229, 0.224, 0.225])
    ])


def create_teacher_model(num_classes: int = 5):
    """Creating a Teacher Model (EfficientNetV2-L)"""
    model = efficientnet_v2_l(weights=EfficientNet_V2_L_Weights.IMAGENET1K_V1)


    # Freeze all parameters
    for param in model.parameters():
        param.requires_grad = False


    # Modify Category Header
    num_features = model.classifier[1].in_features
    model.classifier = nn.Sequential(
        nn.Dropout(p=0.4),
        nn.Linear(num_features, 512),
        nn.ReLU(),
        nn.Dropout(p=0.3),
        nn.Linear(512, num_classes)
    )

    return model


def create_student_model(num_classes: int = 5):
    """Creating Student Models (EfficientNetV2-S)"""
    model = efficientnet_v2_s(weights=EfficientNet_V2_S_Weights.IMAGENET1K_V1)

    num_features = model.classifier[1].in_features
    model.classifier = nn.Sequential(
        nn.Dropout(p=0.3),
        nn.Linear(num_features, 256),
        nn.ReLU(),
        nn.Dropout(p=0.2),
        nn.Linear(256, num_classes)
    )

    return model


class FlowerDataset(Dataset):
    """Flower dataset"""
    def __init__(self, data_dir, transform=None):
        self.dataset = torchvision.datasets.ImageFolder(
            root=data_dir,
            transform=transform
        )


    def __len__(self):
        return len(self.dataset)


    def __getitem__(self, idx):
        return self.dataset[idx]


def prepare_data(batch_size=8):
    # Creating a Data Set
    train_dataset = FlowerDataset('data/pic/train', transform=create_base_transforms())
    val_dataset = FlowerDataset('data/pic/validation', transform=create_base_transforms())

    # Data Loader
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=0)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=0)

    return train_loader, val_loader


def analyze_model_blocks(model):
    """Detailed analysis of the model block structure"""
    print("Model Block Structure:")
    print("\nFeatures blocks:")
    for i, block in enumerate(model.features):
        print(f"\nBlock {i}:")
        print(block)

    print("\nClassifier:")
    print(model.classifier)

    return {
        'num_feature_blocks': len(model.features),
        'classifier_info': model.classifier
    }

In [4]:
def train_one_epoch(
    model: nn.Module,
    train_loader: DataLoader,
    criterion: nn.Module,
    optimizer: optim.Optimizer,
    device: str
) -> float:
    model.train()
    total_loss = 0

    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)

        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    return total_loss / len(train_loader)


def validate_model(
    model: nn.Module,
    val_loader: DataLoader,
    criterion: nn.Module,
    device: str
) -> Tuple[float, float]:
    """verification model"""
    model.eval()
    total_loss = 0
    correct = 0
    total = 0

    with torch.no_grad():
        for inputs, labels in val_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, labels)

            total_loss += loss.item()
            _, predicted = outputs.max(1)
            total += labels.size(0)
            correct += predicted.eq(labels).sum().item()

    return total_loss / len(val_loader), 100. * correct / total


def generate_pseudo_labels(
    model: nn.Module,
    test_loader: DataLoader,
    device: str
) -> Tuple[torch.Tensor, torch.Tensor]:
    """Generate pseudo-labels"""
    model.eval()
    all_pseudo_labels = []
    all_inputs = []

    with torch.no_grad():
        for inputs, _ in test_loader:
            inputs = inputs.to(device)
            outputs = model(inputs)
            _, predicted = outputs.max(1)

            all_pseudo_labels.append(predicted)
            all_inputs.append(inputs)

    return torch.cat(all_inputs),torch.cat(all_pseudo_labels)

In [5]:
class EarlyStopping:
    def __init__(self, patience=3, min_delta=0, verbose=False):
        self.patience = patience
        self.min_delta = min_delta
        self.verbose = verbose
        self.counter = 0
        self.best_loss = None
        self.early_stop = False
        self.best_model = None

    def __call__(self, val_loss, model):
        if self.best_loss is None:
            self.best_loss = val_loss
            self.best_model = copy.deepcopy(model.state_dict())
        elif val_loss > self.best_loss - self.min_delta:
            self.counter += 1
            if self.verbose:
                print(f'EarlyStopping counter: {self.counter} out of {self.patience}')
            if self.counter >= self.patience:
                self.early_stop = True
        else:
            self.best_loss = val_loss
            self.best_model = copy.deepcopy(model.state_dict())
            self.counter = 0


def train_teacher_model(
    model: nn.Module,
    train_loader: DataLoader,
    val_loader: DataLoader,
    num_epochs: int = 30,
    device: str = 'cuda',
    patience: int = 3
) -> Dict:
    model = model.to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.AdamW(model.parameters(), lr=1e-4)
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', patience=3)
    early_stopping = EarlyStopping(patience=patience, verbose=True)

    history = {'train_loss': [], 'val_loss': [], 'val_acc': []}

    # Phase 1: Progressive Unfreezing
    print("Phase 1: Progressive Unfreezing")
    for epoch in range(10):
        if hasattr(model, 'features'):
            total_layers = len(model.features)
            layers_to_unfreeze = int((epoch + 1) / 10 * total_layers)
            for i, layer in enumerate(model.features):
                for param in layer.parameters():
                    param.requires_grad = i >= (total_layers - layers_to_unfreeze)

        train_loss = train_one_epoch(model, train_loader, criterion, optimizer, device)
        val_loss, val_acc = validate_model(model, val_loader, criterion, device)

        scheduler.step(val_loss)
        history['train_loss'].append(train_loss)
        history['val_loss'].append(val_loss)
        history['val_acc'].append(val_acc)

        print(f'Epoch {epoch+1}/10 - Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.2f}%')

        early_stopping(val_loss, model)
        if early_stopping.early_stop:
            print("Early stopping triggered in Phase 1")
            model.load_state_dict(early_stopping.best_model)
            break

    # Phase 2: Iterative Augmentation
    print("\nPhase 2: Iterative Augmentation")
    early_stopping = EarlyStopping(patience=patience, verbose=True)

    for epoch in range(10):
        if epoch % 2 == 0:
            train_loader.dataset.dataset.transform = create_augmented_transforms(epoch // 2)
        else:
            train_loader.dataset.dataset.transform = create_base_transforms()

        train_loss = train_one_epoch(model, train_loader, criterion, optimizer, device)
        val_loss, val_acc = validate_model(model, val_loader, criterion, device)

        scheduler.step(val_loss)
        history['train_loss'].append(train_loss)
        history['val_loss'].append(val_loss)
        history['val_acc'].append(val_acc)

        print(f'Epoch {epoch+11}/20 - Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.2f}%')

        early_stopping(val_loss, model)
        if early_stopping.early_stop:
            print("Early stopping triggered in Phase 2")
            model.load_state_dict(early_stopping.best_model)
            break

    print("\nPhase 3: Pseudo-labeling")
    early_stopping = EarlyStopping(patience=patience, verbose=True)

    # Transferring raw training data to the right device
    train_inputs, train_labels = [], []
    for inputs, labels in train_loader:
        train_inputs.append(inputs.to(device))
        train_labels.append(labels.to(device))

    train_inputs = torch.cat(train_inputs, 0)
    train_labels = torch.cat(train_labels, 0)

    # Generate pseudo-labels
    val_inputs, pseudo_labels = generate_pseudo_labels(model, val_loader, device)

    # Create two TensorDatasets, both on the same device
    train_dataset = torch.utils.data.TensorDataset(train_inputs, train_labels)
    pseudo_dataset = torch.utils.data.TensorDataset(val_inputs, pseudo_labels)

    # Combining
    combined_dataset = ConcatDataset([train_dataset, pseudo_dataset])
    combined_loader = DataLoader(
        combined_dataset,
        batch_size=train_loader.batch_size,
        shuffle=True,
        num_workers=0,
    )

    for epoch in range(10):
          train_loss = train_one_epoch(model, combined_loader, criterion, optimizer, device)
          val_loss, val_acc = validate_model(model, val_loader, criterion, device)

          scheduler.step(val_loss)
          history['train_loss'].append(train_loss)
          history['val_loss'].append(val_loss)
          history['val_acc'].append(val_acc)

          print(f'Epoch {epoch+21}/30 - Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.2f}%')

          early_stopping(val_loss, model)
          if early_stopping.early_stop:
              print("Early stopping triggered in Phase 3")
              model.load_state_dict(early_stopping.best_model)
              break

    return history, model


def train_student_model(
    teacher_model: nn.Module,
    student_model: nn.Module,
    train_loader: DataLoader,
    val_loader: DataLoader,
    num_epochs: int = 15,
    temperature: float = 3.0,
    alpha: float = 0.7,
    device: str = 'cuda',
    patience: int = 7
) -> Dict:
    """Training student models """
    teacher_model = teacher_model.to(device)
    student_model = student_model.to(device)
    teacher_model.eval()

    criterion_ce = nn.CrossEntropyLoss()
    criterion_kl = nn.KLDivLoss(reduction='batchmean')
    optimizer = optim.AdamW(student_model.parameters(), lr=1e-4)
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', patience=3)
    early_stopping = EarlyStopping(patience=patience, verbose=True)

    history = {'train_loss': [], 'val_loss': [], 'val_acc': []}

    for epoch in range(num_epochs):
        student_model.train()
        total_loss = 0

        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)

            with torch.no_grad():
                teacher_outputs = teacher_model(inputs)
                teacher_probs = nn.functional.softmax(teacher_outputs / temperature, dim=1)

            student_outputs = student_model(inputs)
            student_logits = student_outputs / temperature
            student_probs = nn.functional.log_softmax(student_logits, dim=1)

            distillation_loss = criterion_kl(student_probs, teacher_probs)
            student_loss = criterion_ce(student_outputs, labels)

            loss = (alpha * temperature * temperature * distillation_loss +
                   (1 - alpha) * student_loss)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            total_loss += loss.item()

        train_loss = total_loss / len(train_loader)
        val_loss, val_acc = validate_model(student_model, val_loader, criterion_ce, device)

        scheduler.step(val_loss)
        history['train_loss'].append(train_loss)
        history['val_loss'].append(val_loss)
        history['val_acc'].append(val_acc)

        print(f'Epoch {epoch+1}/{num_epochs}:')
        print(f'Train Loss: {train_loss:.4f}')
        print(f'Val Loss: {val_loss:.4f}')
        print(f'Val Accuracy: {val_acc:.2f}%')

        early_stopping(val_loss, student_model)
        if early_stopping.early_stop:
            print("Early stopping triggered")
            student_model.load_state_dict(early_stopping.best_model)
            break

    return history, student_model

In [6]:
# Setting up the device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

train_loader, val_loader = prepare_data()

# Creat Models
teacher_model = create_teacher_model()
student_model = create_student_model()

print("Teacher Model (EfficientNetV2-L) Structure:")
teacher_blocks = analyze_model_blocks(teacher_model)

print("Training teacher model...")
teacher_history, trained_teacher = train_teacher_model(
    teacher_model,
    train_loader,
    val_loader,
    device=device,
    patience=3
)

print("\nStudent Model (EfficientNetV2-S) Structure:")
student_blocks = analyze_model_blocks(student_model)

print("\nTraining student model...")
student_history, trained_student = train_student_model(
    trained_teacher,
    student_model,
    train_loader,
    val_loader,
    patience=3
)

# Evaluate the final results
i, teacher_acc = validate_model(trained_teacher, val_loader, nn.CrossEntropyLoss(), device)
i, student_acc = validate_model(trained_student, val_loader, nn.CrossEntropyLoss(), device)

print(f"\nFinal Results on Validation Set:")
print(f"Teacher Model Accuracy: {teacher_acc:.2f}%")
print(f"Student Model Accuracy: {student_acc:.2f}%")

Using device: cuda


Downloading: "https://download.pytorch.org/models/efficientnet_v2_l-59c71312.pth" to /root/.cache/torch/hub/checkpoints/efficientnet_v2_l-59c71312.pth
100%|██████████| 455M/455M [00:02<00:00, 174MB/s]
Downloading: "https://download.pytorch.org/models/efficientnet_v2_s-dd5fe13b.pth" to /root/.cache/torch/hub/checkpoints/efficientnet_v2_s-dd5fe13b.pth
100%|██████████| 82.7M/82.7M [00:00<00:00, 194MB/s]


Teacher Model (EfficientNetV2-L) Structure:
Model Block Structure:

Features blocks:

Block 0:
Conv2dNormActivation(
  (0): Conv2d(3, 32, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
  (1): BatchNorm2d(32, eps=0.001, momentum=0.1, affine=True, track_running_stats=True)
  (2): SiLU(inplace=True)
)

Block 1:
Sequential(
  (0): FusedMBConv(
    (block): Sequential(
      (0): Conv2dNormActivation(
        (0): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (1): BatchNorm2d(32, eps=0.001, momentum=0.1, affine=True, track_running_stats=True)
        (2): SiLU(inplace=True)
      )
    )
    (stochastic_depth): StochasticDepth(p=0.0, mode=row)
  )
  (1): FusedMBConv(
    (block): Sequential(
      (0): Conv2dNormActivation(
        (0): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (1): BatchNorm2d(32, eps=0.001, momentum=0.1, affine=True, track_running_stats=True)
        (2): SiLU(inplace=True)