In [1]:
import torch
import torch.nn as nn 
import torch.optim as optim
from torchvision import datasets, transforms, models
from torch.utils.data import DataLoader

In [2]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device

device(type='cuda')

In [3]:
# EfficientNet-B2 expects 260×260 images. 
IMG_SIZE = 260
BATCH_SIZE = 32

In [4]:
# train transforms 
train_transforms = transforms.Compose([
    transforms.Resize((IMG_SIZE, IMG_SIZE)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(10),
    transforms.ToTensor(),
    transforms.Normalize(
        mean=[0.485, 0.456, 0.406],
        std=[0.229, 0.224, 0.225]
    )
])

In [5]:
# test and validation transforms 
val_transforms = transforms.Compose([
    transforms.Resize((IMG_SIZE, IMG_SIZE)),
    transforms.ToTensor(),
    transforms.Normalize(
        mean=[0.485, 0.456, 0.406],
        std=[0.229, 0.224, 0.225]
    )
])

In [6]:
train_dir = "Data/snack_vision_lite/train"
val_dir = "Data/snack_vision_lite/val"
test_dir = "Data/snack_vision_lite/test"

In [7]:
train_dataset = datasets.ImageFolder(train_dir , train_transforms)
test_dataset = datasets.ImageFolder(test_dir , val_transforms)
val_dataset = datasets.ImageFolder(val_dir , val_transforms)

In [27]:
train_loader = DataLoader(
    dataset = train_dataset , batch_size = BATCH_SIZE , pin_memory = True , num_workers = 4 , shuffle = True
)
val_loader = DataLoader(
    dataset = val_dataset , batch_size = BATCH_SIZE , pin_memory = True , num_workers = 4
)
test_loader = DataLoader(
    dataset = test_dataset , batch_size = BATCH_SIZE , pin_memory = True , num_workers = 4
)

In [18]:
num_classes = len(train_dataset.classes)

In [19]:
(num_classes)

3

In [10]:
model = models.efficientnet_b2(weights = "IMAGENET1K_V1" , progress = True)

In [13]:
from torchinfo import summary
summary(model , input_size = (BATCH_SIZE , 3 , IMG_SIZE , IMG_SIZE))

Layer (type:depth-idx)                                  Output Shape              Param #
EfficientNet                                            [32, 1000]                --
├─Sequential: 1-1                                       [32, 1408, 9, 9]          --
│    └─Conv2dNormActivation: 2-1                        [32, 32, 130, 130]        --
│    │    └─Conv2d: 3-1                                 [32, 32, 130, 130]        864
│    │    └─BatchNorm2d: 3-2                            [32, 32, 130, 130]        64
│    │    └─SiLU: 3-3                                   [32, 32, 130, 130]        --
│    └─Sequential: 2-2                                  [32, 16, 130, 130]        --
│    │    └─MBConv: 3-4                                 [32, 16, 130, 130]        1,448
│    │    └─MBConv: 3-5                                 [32, 16, 130, 130]        612
│    └─Sequential: 2-3                                  [32, 24, 65, 65]          --
│    │    └─MBConv: 3-6                                

In [14]:
class EfficientNetUpdated(nn.Module): 
    def __init__(self , base_model, num_classes, freeze_backbone = True, dropout = 0.3): 
        super().__init__()

        self.base_model = base_model

        # -------freeze the cnn part(backbone) ----------
        if freeze_backbone: 
            for param in self.base_model.parameters(): 
                param.requires_grad = False

        # ------ replace classifier with our objective -------------
        if hasattr(self.base_model , 'classifier'): 
            in_features = self.base_model.classifier[-1].in_features
            # replace now 
            self.base_model.classifier = nn.Sequential(
                nn.Dropout(dropout), 
                nn.Linear(in_features , num_classes)
            )
        elif hasattr(self.base_model , 'fc'): 
            in_features = self.base_model.fc.in_features
            self.base_model.fc = nn.Sequential(
                nn.Dropout(dropout),
                nn.Linear(in_features, num_classes)
            )
        else:
            raise ValueError("Unsupported model architecture")
            
    def forward(self , x): 
        return self.base_model(x) 

In [29]:
updated_model = EfficientNetUpdated(base_model = model,num_classes = num_classes).to(device)

In [22]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(
    filter(lambda p: p.requires_grad, updated_model.parameters()), lr = 1e-3
)

In [21]:
def train_single_epoch(model,train_loader,val_loader, criterion, optimizer, device):
    # -------- Training --------
    model.train()
    train_loss = 0.0

    for x, y in train_loader:
        x, y = x.to(device), y.to(device)

        optimizer.zero_grad()
        outputs = model(x)
        loss = criterion(outputs, y)
        loss.backward()
        optimizer.step()

        train_loss += loss.item()

    train_loss /= len(train_loader)

    # -------- Training accuracy --------
    model.eval()
    train_correct = 0
    total_train = 0

    with torch.no_grad():
        for x, y in train_loader:
            x, y = x.to(device), y.to(device)
            outputs = model(x)
            preds = torch.argmax(outputs, dim=1)

            train_correct += (preds == y).sum().item()
            total_train += y.size(0)

    train_acc = train_correct / total_train

    # -------- Validation accuracy --------
    val_correct = 0
    total_val = 0

    with torch.no_grad():
        for x, y in val_loader:
            x, y = x.to(device), y.to(device)
            outputs = model(x)
            preds = torch.argmax(outputs, dim=1)

            val_correct += (preds == y).sum().item()
            total_val += y.size(0)

    val_acc = val_correct / total_val

    return train_loss, train_acc, val_acc

In [None]:
EPOCHS = 10
for epoch in range(EPOCHS): 
    train_loss, train_acc, val_acc = train_single_epoch(
        model = updated_model, 
        train_loader = train_loader,optimizer = optimizer,
        val_loader = val_loader,criterion = criterion,device = device
    )
    print(f"[{epoch + 1}/{EPOCHS}]: train loss: {train_loss:.4f} | train acc: {train_acc:.4f} | val acc: {val_acc:.4f}")