In [18]:
# import kagglehub
from torchvision import datasets, transforms
from torch.utils.data import DataLoader, Subset
import random
import torch.nn as nn
import torch.nn.functional as F
from torchvision.models import efficientnet_b5, EfficientNet_B5_Weights
from torchvision.models._api import WeightsEnum
from torch.hub import load_state_dict_from_url
import torch.optim as optim
import torch

In [19]:
path="../input/final_split_training_augmented/train"

transform = transforms.Compose([
    transforms.Resize((224,224)),
    transforms.ToTensor(),
    # You can test the normalization but if u apply remember to apply for testing
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                        std=[0.229, 0.224, 0.225]),
])

allDataSet = datasets.ImageFolder(root=path, transform=transform)
totalSize = len(allDataSet)
reductionRatio = 1
reducedSize = int(totalSize * reductionRatio)
indices = list(range(totalSize))
random.shuffle(indices)
reduced_indices = indices[:reducedSize]
reducedDataset = Subset(allDataSet, reduced_indices)

print(f"Reduced Train: {len(reducedDataset)}")
dataloader = DataLoader(reducedDataset, batch_size=32, shuffle=True, drop_last=True)


Reduced Train: 8025


In [20]:
path="../input/final_split_training_augmented/eval"

transform = transforms.Compose([
    transforms.Resize((224,224)),
    transforms.ToTensor(),
])

evalSet = datasets.ImageFolder(root=path, transform=transform)

print(f"Reduced Train: {len(evalSet)}")
valLoader = DataLoader(evalSet, batch_size=32, shuffle=True)


Reduced Train: 579


In [21]:
# Number of classes in your dataset
num_classes = len(reducedDataset.dataset.classes) 
print(num_classes)

4


### Base Model

In [22]:
def get_state_dict(self, *args, **kwargs):
    kwargs.pop("check_hash")
    return load_state_dict_from_url(self.url, *args, **kwargs)
WeightsEnum.get_state_dict = get_state_dict

model = efficientnet_b5(weights="DEFAULT")

num_classes = len(reducedDataset.dataset.classes)
print("Number of classes:", num_classes)

# Freeze everything
for param in model.parameters():
    param.requires_grad = False

# Add SE block + classifier
channels = model.classifier[1].in_features
model.classifier = nn.Sequential(
    nn.Dropout(p=0.4),
    nn.Linear(channels, num_classes)
)


optimizer = optim.Adam(model.classifier.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
model.train()

Number of classes: 4


EfficientNet(
  (features): Sequential(
    (0): Conv2dNormActivation(
      (0): Conv2d(3, 48, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
      (1): BatchNorm2d(48, eps=0.001, momentum=0.01, affine=True, track_running_stats=True)
      (2): SiLU(inplace=True)
    )
    (1): Sequential(
      (0): MBConv(
        (block): Sequential(
          (0): Conv2dNormActivation(
            (0): Conv2d(48, 48, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), groups=48, bias=False)
            (1): BatchNorm2d(48, eps=0.001, momentum=0.01, affine=True, track_running_stats=True)
            (2): SiLU(inplace=True)
          )
          (1): SqueezeExcitation(
            (avgpool): AdaptiveAvgPool2d(output_size=1)
            (fc1): Conv2d(48, 12, kernel_size=(1, 1), stride=(1, 1))
            (fc2): Conv2d(12, 48, kernel_size=(1, 1), stride=(1, 1))
            (activation): SiLU(inplace=True)
            (scale_activation): Sigmoid()
          )
          (2): Conv2dNormAct

In [23]:
epochs = 10
for epoch in range(epochs):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0

    for images, labels in dataloader:
        images, labels = images.to(device), labels.to(device)

        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item() * images.size(0)
        _, predicted = torch.max(outputs, 1)
        correct += (predicted == labels).sum().item()
        total += labels.size(0)

    acc = correct / total
    print(f"Epoch [{epoch+1}/{epochs}] Loss: {running_loss/total:.4f} Accuracy: {acc:.4f}")

Epoch [1/10] Loss: 1.1125 Accuracy: 0.5346
Epoch [2/10] Loss: 0.9516 Accuracy: 0.6059
Epoch [3/10] Loss: 0.9030 Accuracy: 0.6336
Epoch [4/10] Loss: 0.8687 Accuracy: 0.6515
Epoch [5/10] Loss: 0.8643 Accuracy: 0.6461
Epoch [6/10] Loss: 0.8489 Accuracy: 0.6561
Epoch [7/10] Loss: 0.8443 Accuracy: 0.6606
Epoch [8/10] Loss: 0.8289 Accuracy: 0.6703
Epoch [9/10] Loss: 0.8295 Accuracy: 0.6691
Epoch [10/10] Loss: 0.8370 Accuracy: 0.6603


In [24]:
torch.save(model.state_dict(), "../models/baseModelClassifier.pth")

### SE Classifier

In [6]:
class SqueezeExcitationBlock(nn.Module):
    def __init__(self, channels, reduction_ratio=16):
        super(SqueezeExcitationBlock, self).__init__()
        self.avg_pool = nn.AdaptiveAvgPool2d(1)
        self.interaction = nn.Sequential(
            nn.Linear(channels, channels // reduction_ratio, bias=False),
            nn.ReLU(inplace=True),
            nn.Linear(channels // reduction_ratio, channels, bias=False),
            nn.Sigmoid()
        )

    def forward(self, x):
        b, c, _, _ = x.size()
        y = self.avg_pool(x).view(b, c)
        y = self.interaction(y).view(b, c, 1, 1)
        return x * y.expand_as(x)

def get_state_dict(self, *args, **kwargs):
    kwargs.pop("check_hash")
    return load_state_dict_from_url(self.url, *args, **kwargs)
WeightsEnum.get_state_dict = get_state_dict

model = efficientnet_b5(weights="DEFAULT")

num_classes = len(reducedDataset.dataset.classes)
print("Number of classes:", num_classes)

# Freeze everything
for param in model.parameters():
    param.requires_grad = False

# Add SE block + classifier
channels = model.classifier[1].in_features
model.classifier = nn.Sequential(
    nn.Unflatten(1, (channels, 1, 1)),          # Reshape [B, C] → [B, C, 1, 1]
    SqueezeExcitationBlock(channels),
    nn.Flatten(),                               # Back to [B, C]
    nn.Dropout(p=0.4),
    nn.Linear(channels, num_classes)
)


optimizer = optim.Adam(model.classifier.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
model.train()

Number of classes: 4


EfficientNet(
  (features): Sequential(
    (0): Conv2dNormActivation(
      (0): Conv2d(3, 48, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
      (1): BatchNorm2d(48, eps=0.001, momentum=0.01, affine=True, track_running_stats=True)
      (2): SiLU(inplace=True)
    )
    (1): Sequential(
      (0): MBConv(
        (block): Sequential(
          (0): Conv2dNormActivation(
            (0): Conv2d(48, 48, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), groups=48, bias=False)
            (1): BatchNorm2d(48, eps=0.001, momentum=0.01, affine=True, track_running_stats=True)
            (2): SiLU(inplace=True)
          )
          (1): SqueezeExcitation(
            (avgpool): AdaptiveAvgPool2d(output_size=1)
            (fc1): Conv2d(48, 12, kernel_size=(1, 1), stride=(1, 1))
            (fc2): Conv2d(12, 48, kernel_size=(1, 1), stride=(1, 1))
            (activation): SiLU(inplace=True)
            (scale_activation): Sigmoid()
          )
          (2): Conv2dNormAct

In [7]:
epochs = 10
for epoch in range(epochs):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0

    for images, labels in dataloader:
        images, labels = images.to(device), labels.to(device)

        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item() * images.size(0)
        _, predicted = torch.max(outputs, 1)
        correct += (predicted == labels).sum().item()
        total += labels.size(0)

    acc = correct / total
    print(f"Epoch [{epoch+1}/{epochs}] Loss: {running_loss/total:.4f} Accuracy: {acc:.4f}")

Epoch [1/10] Loss: 1.1311 Accuracy: 0.5366
Epoch [2/10] Loss: 0.9220 Accuracy: 0.6286
Epoch [3/10] Loss: 0.8327 Accuracy: 0.6664
Epoch [4/10] Loss: 0.7390 Accuracy: 0.7121
Epoch [5/10] Loss: 0.6438 Accuracy: 0.7560
Epoch [6/10] Loss: 0.5549 Accuracy: 0.7994
Epoch [7/10] Loss: 0.4800 Accuracy: 0.8359
Epoch [8/10] Loss: 0.4128 Accuracy: 0.8604
Epoch [9/10] Loss: 0.3671 Accuracy: 0.8799
Epoch [10/10] Loss: 0.3272 Accuracy: 0.8945


In [8]:
torch.save(model.state_dict(), "../models/SEClassifier.pth")

### bestSEModified

Select one of the subcells here to expand and run to define the model before going to the training block

#### SE after features and simple classifier (0.7290 on test)

In [25]:
def get_state_dict(self, *args, **kwargs):
    kwargs.pop("check_hash")
    return load_state_dict_from_url(self.url, *args, **kwargs)

class SqueezeExcitationBlock(nn.Module):
    def __init__(self, channels, reduction_ratio=16):
        super(SqueezeExcitationBlock, self).__init__()
        self.global_avg_pool = nn.AdaptiveAvgPool2d(1)
        self.fc = nn.Sequential(

        )
        self.interaction = nn.Sequential(
            nn.Linear(channels, channels // reduction_ratio, bias=False),
            nn.ReLU(inplace=True),
            nn.Linear(channels // reduction_ratio, channels, bias=False),
            nn.Sigmoid()
        )

    def forward(self, x):
        b, c, _, _ = x.size()
        y = self.global_avg_pool(x).view(b, c)
        y = self.interaction(y).view(b, c, 1, 1)
        return x * y.expand_as(x)    

class CustomENB5(nn.Module):
    def __init__(self,num_classes):
        super(CustomENB5, self).__init__()
        WeightsEnum.get_state_dict = get_state_dict

        self.base = efficientnet_b5(weights="DEFAULT")
        for param in self.base.parameters():
            param.requires_grad = False
        
        self.seAfterFeature = SqueezeExcitationBlock(
            channels=2048
        )

        self.base.classifier = nn.Sequential(
            nn.Unflatten(1, (2048, 1, 1)),         
            SqueezeExcitationBlock(2048),
            nn.Flatten(),                            
            nn.Dropout(p=0.4),
            nn.Linear(2048, num_classes)
        )

    def forward(self, x):
        x = self.base.features(x)
        x = self.seAfterFeature(x)
        x = self.base.avgpool(x)
        x = torch.flatten(x,1)
        x = self.base.classifier(x)
        return x        

#### SE after features and new classifier strucuture (0.6958)

In [15]:
def get_state_dict(self, *args, **kwargs):
    kwargs.pop("check_hash")
    return load_state_dict_from_url(self.url, *args, **kwargs)

class SqueezeExcitationBlock(nn.Module):
    def __init__(self, channels, reduction_ratio=16):
        super(SqueezeExcitationBlock, self).__init__()
        self.global_avg_pool = nn.AdaptiveAvgPool2d(1)
        self.fc = nn.Sequential(

        )
        self.interaction = nn.Sequential(
            nn.Linear(channels, channels // reduction_ratio, bias=False),
            nn.ReLU(inplace=True),
            nn.Linear(channels // reduction_ratio, channels, bias=False),
            nn.Sigmoid()
        )

    def forward(self, x):
        b, c, _, _ = x.size()
        y = self.global_avg_pool(x).view(b, c)
        y = self.interaction(y).view(b, c, 1, 1)
        return x * y.expand_as(x)
     
class DenseResidualBlock(nn.Module):
    """
    Dense Residual Block
    Processes input with skip connections and returns in the same number of channels

    inChannels (int): the number of input channels going into the block (and also being returned)
    internalSize (int): the number of channels within the internal layers of the block
    """
    def __init__(self, in_features, hidden_size=128, dropout_rate=0.3):
        super(DenseResidualBlock, self).__init__()

        self.fc1 = nn.Linear(in_features, hidden_size)
        self.bn1 = nn.BatchNorm1d(hidden_size)

        self.dropout = nn.Dropout(p=dropout_rate)

        self.fc2 = nn.Linear(hidden_size, in_features)
        self.bn2 = nn.BatchNorm1d(in_features)

    def forward(self, x):
        identity = x  # Save for residual

        out = F.relu(self.bn1(self.fc1(x)))
        out = self.dropout(out)
        out = F.relu(self.bn2(self.fc2(out)))

        return identity + out 
  
class CustomENB5(nn.Module):
    def __init__(self,num_classes):
        super(CustomENB5, self).__init__()
        WeightsEnum.get_state_dict = get_state_dict

        self.base = efficientnet_b5(weights="DEFAULT")
        for name, param in self.base.named_parameters():
            if "features.6" in name or "features.7" in name:
                param.requires_grad = True
        
        self.seAfterFeature = SqueezeExcitationBlock(
            channels=2048
        )

        self.base.classifier = nn.Sequential(
            nn.Dropout(p=0.3),
            nn.Linear(2048, 512),
            nn.ReLU(),
            nn.Dropout(p=0.3),
            nn.Linear(512, 256),
            nn.ReLU(),
            nn.Dropout(p=0.3),
            DenseResidualBlock(in_features=256),
            nn.Linear(256, 512),
            nn.BatchNorm1d(512),
            nn.Dropout(p=0.3),
            nn.Linear(512, num_classes)
        )
        

    def forward(self, x):
        x = self.base.features(x)
        x = self.seAfterFeature(x)
        x = F.adaptive_avg_pool2d(x, (1, 1))
        x = torch.flatten(x,1)
        x = self.base.classifier(x)
        return x        

### Trainer

In [26]:
model = CustomENB5(num_classes)

optimizer = optim.Adam(model.base.classifier.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
model.train()

CustomENB5(
  (base): EfficientNet(
    (features): Sequential(
      (0): Conv2dNormActivation(
        (0): Conv2d(3, 48, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
        (1): BatchNorm2d(48, eps=0.001, momentum=0.01, affine=True, track_running_stats=True)
        (2): SiLU(inplace=True)
      )
      (1): Sequential(
        (0): MBConv(
          (block): Sequential(
            (0): Conv2dNormActivation(
              (0): Conv2d(48, 48, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), groups=48, bias=False)
              (1): BatchNorm2d(48, eps=0.001, momentum=0.01, affine=True, track_running_stats=True)
              (2): SiLU(inplace=True)
            )
            (1): SqueezeExcitation(
              (avgpool): AdaptiveAvgPool2d(output_size=1)
              (fc1): Conv2d(48, 12, kernel_size=(1, 1), stride=(1, 1))
              (fc2): Conv2d(12, 48, kernel_size=(1, 1), stride=(1, 1))
              (activation): SiLU(inplace=True)
              (scale_a

In [27]:
epochs = 10
best_val_acc = 0.0
patience = 3
trigger_times = 0

for epoch in range(epochs):
    running_loss = 0.0
    correct = 0
    total = 0

    for images, labels in dataloader:
        images, labels = images.to(device), labels.to(device)

        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item() * images.size(0)
        _, predicted = torch.max(outputs, 1)
        correct += (predicted == labels).sum().item()
        total += labels.size(0)

    acc = correct / total
    print(f"Epoch [{epoch+1}/{epochs}] Loss: {running_loss/total:.4f} Accuracy: {acc:.4f}")

    model.eval()
    eval_loss = 0.0
    eval_correct = 0
    eval_total = 0

    with torch.no_grad():
        for images, labels in valLoader:  # assumes valLoader is defined
            images, labels = images.to(device), labels.to(device)

            outputs = model(images)
            loss = criterion(outputs, labels)

            eval_loss += loss.item() * images.size(0)
            _, predicted = torch.max(outputs, 1)
            eval_correct += (predicted == labels).sum().item()
            eval_total += labels.size(0)

    val_loss = eval_loss / eval_total
    val_acc = eval_correct / eval_total
    print(f"Eval  Loss: {val_loss:.4f} | Accuracy: {val_acc:.4f}")
    if val_acc > best_val_acc:
        best_val_acc = val_acc
        trigger_times = 0
        torch.save(model.state_dict(), "../models/bestSEModified.pth")
    else:
        trigger_times += 1
        print(f"No improvement. Trigger {trigger_times}/{patience}")

        if trigger_times >= patience:
            print("Early stopping triggered.")
            break

Epoch [1/10] Loss: 1.2028 Accuracy: 0.5370
Eval  Loss: 1.2293 | Accuracy: 0.4646
Epoch [2/10] Loss: 0.9634 Accuracy: 0.6408
Eval  Loss: 1.1937 | Accuracy: 0.4767
Epoch [3/10] Loss: 0.8084 Accuracy: 0.7039
Eval  Loss: 1.1878 | Accuracy: 0.4888
Epoch [4/10] Loss: 0.6715 Accuracy: 0.7772
Eval  Loss: 1.1907 | Accuracy: 0.4940
Epoch [5/10] Loss: 0.5238 Accuracy: 0.8509
Eval  Loss: 1.1961 | Accuracy: 0.4905
No improvement. Trigger 1/3
Epoch [6/10] Loss: 0.3808 Accuracy: 0.9219
Eval  Loss: 1.2056 | Accuracy: 0.4801
No improvement. Trigger 2/3
Epoch [7/10] Loss: 0.2643 Accuracy: 0.9640
Eval  Loss: 1.2175 | Accuracy: 0.4922
No improvement. Trigger 3/3
Early stopping triggered.
