In [1]:
import os
import torch
import torch.nn as nn
import torch.optim  as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader, random_split
import torchvision.datasets as datasets
import torchvision.transforms as transforms
from PIL import Image
from tqdm import tqdm
from math import ceil

![Efficientnet Architecture](architecture_images\efficient_net_architecture.png)

In [2]:
base_model = [
    # expand_ratio, channels, repeats, stride, kernel_size
    [1, 16, 1, 1, 3], 
    [6, 24, 2, 2, 3],
    [6, 40, 2, 2, 5],
    [6, 80, 3, 2, 3],
    [6, 112, 3, 1, 5],
    [6, 192, 4, 2, 5],
    [6, 320, 1, 1, 3],
]

phi_values = {
    # tuple of: (phi_value, resolution, drop_rate)
    "b0": (0, 224, 0.2), # alpha, beta, gamma
    "b1": (0.5, 240, 0.2),
    "b2": (1, 260, 0.3),
    "b3": (2, 300, 0.3),
    "b4": (3, 380, 0.4),
    "b5": (4, 456, 0.4),
    "b6": (5, 528, 0.5),
    "b7": (6, 600, 0.5),
}

In [3]:
class CNNBlock(nn.Module):
    def __init__(
            self, in_channels, out_channels, kernel_size, stride, padding, groups=1, 
            ):
        super(CNNBlock, self).__init__()
        self.cnn = nn.Conv2d(in_channels, out_channels, kernel_size, stride, padding, groups=groups, bias=False)
        self.bn = nn.BatchNorm2d(out_channels)
        self.silu = nn.SiLU()

    def forward(self, x):
        return self.silu(self.bn(self.cnn(x)))
    

class SqueezeExcitation(nn.Module):
    def __init__(self, in_channels, reduced_dims):
        super(SqueezeExcitation, self).__init__()
        self.se = nn.Sequential(
            nn.AdaptiveAvgPool2d(1), # C x H x W -> C x 1 x 1
            nn.Conv2d(in_channels, reduced_dims, 1),
            nn.SiLU(),
            nn.Conv2d(reduced_dims, in_channels, 1), 
            nn.Sigmoid(),
        )

    def forward(self, x):
        return x * self.se(x)
    

class InvertedResidualBlock(nn.Module):
    def __init__(self, 
                 in_channels, 
                 out_channels, 
                 kernel_size, 
                 stride, 
                 padding, 
                 expand_ratio, 
                 reduction=4, # squeeze excitation
                 survival_prob=0.8, # for stochastic depth
                 ):
        super(InvertedResidualBlock, self).__init__()
        
        self.survival_prob = survival_prob
        self.use_residual = in_channels == out_channels and stride == 1
        hidden_dim = in_channels * expand_ratio
        self.expand = in_channels != hidden_dim
        reduced_dim = int(in_channels / reduction)

        if self.expand:
            self.expand_conv = CNNBlock(
                in_channels, hidden_dim, kernel_size=3, stride=1, padding=1
            )
        self.conv = nn.Sequential(
            CNNBlock(
                hidden_dim, hidden_dim, kernel_size, stride, padding, groups=hidden_dim, 
            ),
            SqueezeExcitation(hidden_dim, reduced_dim),
            nn.Conv2d(hidden_dim, out_channels, 1, bias=False),
            nn.BatchNorm2d(out_channels),

        )

    def stochastic_depth(self, x):
        if not self.training:
            return x
        
        binary_tensor = torch.rand(x.shape[0], 1, 1, 1, device=x.device) < self.survival_prob
        return torch.div(x, self.survival_prob) * binary_tensor
    
    def forward(self, inputs):
        x = self.expand_conv(inputs) if self.expand else inputs

        if self.use_residual:
            return self.stochastic_depth(self.conv(x)) + inputs
        else:
            return self.conv(x)

In [4]:
class EfficientNet(nn.Module):
    def __init__(self, version, num_classes):
        super(EfficientNet, self).__init__()
        width_factor, depth_factor, dropout_rate = self.calculate_factors(version)
        last_channels = ceil(1280 * width_factor)
        self.pool = nn.AdaptiveAvgPool2d(1)
        self.features = self.create_features(width_factor, depth_factor, last_channels)
        self.classifier = nn.Sequential(
            nn.Dropout(dropout_rate),
            nn.Linear(last_channels, num_classes)  
        )


    def calculate_factors(self, version, alpha=1.2, beta=1.1):
        phi, res, drop_rate = phi_values[version]
        depth_factor = alpha ** phi
        width_factor = beta ** phi
        return width_factor, depth_factor, drop_rate
    
    def create_features(self, width_factor, depth_factor, last_channels):
        channels = int(32 *  width_factor)
        features = [CNNBlock(3, channels, 3, stride=2, padding=1)]
        in_channels = channels

        for expand_ratio, channels, repeats, stride, kernel_size in base_model:
            out_channels = 4*ceil(int(channels*width_factor) / 4)
            layers_repeats = ceil(repeats * depth_factor)

            for layer in range(layers_repeats):
                features.append(
                    InvertedResidualBlock(
                        in_channels,
                        out_channels,expand_ratio=expand_ratio,
                        stride = stride if layer == 0 else 1,
                        kernel_size=kernel_size,
                        padding=kernel_size//2, # if k=1:pad=0, k=3:pad=1, k=5:pad:2
                    )
                )

                in_channels = out_channels

        features.append(
            CNNBlock(in_channels, last_channels, kernel_size=1, stride=1, padding=0)
        )

        return nn.Sequential(*features)
    
    def forward(self, x):
        x = self.pool(self.features(x))
        return self.classifier(x.view(x.shape[0], -1))
    

In [5]:
def test():
    device = "cuda" if torch.cuda.is_available() else "cpu"
    version = "b0"

    phi, res, drop_rate  = phi_values[version]
    num_examples, num_classes = 4, 10
    x = torch.randn(num_examples, 3, res, res).to(device)
    model = EfficientNet(
        version=version,
        num_classes=num_classes
    ).to(device)
    print(model(x).shape)

In [6]:
test()

torch.Size([4, 10])


##  Loading dataset

In [7]:
data_transforms = {
    'train': transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    'val': transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    'test': transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
}

In [8]:
data_dir = 'cats_vs_dogs_mini_dataset'
dataset = datasets.ImageFolder(os.path.join(data_dir), transform=data_transforms['train'])

In [9]:
train_size = int(0.7 * len(dataset))
val_size = int(0.15 * len(dataset))
test_size = len(dataset) - train_size - val_size

In [10]:
train_dataset, val_dataset, test_dataset = random_split(dataset, [train_size, val_size, test_size])

In [11]:
val_dataset.dataset.transform = data_transforms['val']
test_dataset.dataset.transform = data_transforms['test']

In [12]:
batch_size = 24

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=True)

In [13]:
train_data_iter = iter(train_loader)

# Get the next batch
images, labels = next(train_data_iter)
print(images.shape)
print(labels)
print(train_dataset.dataset.class_to_idx)

torch.Size([24, 3, 224, 224])
tensor([1, 0, 0, 1, 1, 0, 0, 0, 1, 0, 1, 1, 1, 1, 1, 0, 0, 1, 1, 0, 1, 1, 1, 0])
{'cats_set': 0, 'dogs_set': 1}


In [14]:
train_dataset.dataset.classes

['cats_set', 'dogs_set']

In [15]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

cuda


In [16]:
model = EfficientNet("b0", num_classes=len(train_dataset.dataset.classes)).to(device)

In [17]:
# Define loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.0001)

## Training

In [18]:
def train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs=10, patience=3, save_path="saved_best_models/efficientnet_best_model.pth"):
    best_val_loss = float('inf')  # Initialize best validation loss to a large value
    epochs_no_improve = 0  # Counter for early stopping
    
    for epoch in range(num_epochs):
        # Training phase
        model.train()
        train_loss = 0.0
        correct = 0
        total = 0
        for images, labels in tqdm(train_loader):
            images, labels = images.to(device), labels.to(device)
            optimizer.zero_grad()
            
            # Forward pass
            outputs = model(images)
            loss = criterion(outputs, labels)
            train_loss += loss.item()
            
            # Backward and optimize
            loss.backward()
            optimizer.step()
            
            # Calculate train accuracy
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
        
        train_loss /= len(train_loader)
        train_accuracy = 100 * correct / total
        
        # Validation phase
        model.eval()
        val_loss = 0.0
        correct = 0
        total = 0
        with torch.no_grad():
            for images, labels in val_loader:
                images, labels = images.to(device), labels.to(device)
                
                outputs = model(images)
                loss = criterion(outputs, labels)
                val_loss += loss.item()
                
                # Calculate validation accuracy
                _, predicted = torch.max(outputs, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()
        
        val_loss /= len(val_loader)
        val_accuracy = 100 * correct / total
        
        # Check if validation loss improved
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            epochs_no_improve = 0  # Reset early stopping counter
            torch.save(model.state_dict(), save_path)  # Save best model
            print(f"Best model saved with val_loss: {best_val_loss:.4f}")
        # else:
        #     epochs_no_improve += 1
        
        # # Early stopping check
        # if epochs_no_improve >= patience:
        #     print("Early stopping triggered.")
        #     break
        
        print(f"Epoch [{epoch + 1}/{num_epochs}], "
              f"Train Loss: {train_loss:.4f}, Train Acc: {train_accuracy:.2f}%, "
              f"Val Loss: {val_loss:.4f}, Val Acc: {val_accuracy:.2f}%")

In [19]:
num_epochs = 10
train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs)

100%|██████████| 30/30 [00:48<00:00,  1.63s/it]


Best model saved with val_loss: 0.6951
Epoch [1/10], Train Loss: 0.7113, Train Acc: 52.43%, Val Loss: 0.6951, Val Acc: 48.67%


100%|██████████| 30/30 [00:37<00:00,  1.24s/it]


Epoch [2/10], Train Loss: 0.6984, Train Acc: 55.86%, Val Loss: 0.7206, Val Acc: 48.67%


100%|██████████| 30/30 [00:39<00:00,  1.31s/it]


Best model saved with val_loss: 0.6872
Epoch [3/10], Train Loss: 0.6733, Train Acc: 58.00%, Val Loss: 0.6872, Val Acc: 56.67%


100%|██████████| 30/30 [00:37<00:00,  1.24s/it]


Best model saved with val_loss: 0.6312
Epoch [4/10], Train Loss: 0.6712, Train Acc: 58.57%, Val Loss: 0.6312, Val Acc: 56.67%


100%|██████████| 30/30 [00:34<00:00,  1.14s/it]


Epoch [5/10], Train Loss: 0.6657, Train Acc: 61.29%, Val Loss: 0.6533, Val Acc: 62.00%


100%|██████████| 30/30 [00:35<00:00,  1.17s/it]


Epoch [6/10], Train Loss: 0.6409, Train Acc: 62.57%, Val Loss: 0.6965, Val Acc: 53.33%


100%|██████████| 30/30 [00:36<00:00,  1.23s/it]


Epoch [7/10], Train Loss: 0.5471, Train Acc: 70.14%, Val Loss: 0.7412, Val Acc: 58.67%


100%|██████████| 30/30 [00:35<00:00,  1.17s/it]


Epoch [8/10], Train Loss: 0.5061, Train Acc: 76.57%, Val Loss: 0.7454, Val Acc: 56.67%


100%|██████████| 30/30 [00:35<00:00,  1.17s/it]


Epoch [9/10], Train Loss: 0.4870, Train Acc: 77.29%, Val Loss: 0.7216, Val Acc: 62.67%


100%|██████████| 30/30 [00:35<00:00,  1.20s/it]


Epoch [10/10], Train Loss: 0.4031, Train Acc: 82.86%, Val Loss: 0.8042, Val Acc: 56.00%


## Test set eval

In [20]:
def evaluate_model(model, test_loader):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for images, labels in test_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    
    test_accuracy = 100 * correct / total
    print(f"Test Accuracy: {test_accuracy:.2f}%")

# Run evaluation
evaluate_model(model, test_loader)

Test Accuracy: 55.33%


## Load and inference

In [21]:
def load_model(model, load_path="best_model.pth"):
    model.load_state_dict(torch.load(load_path, weights_only=True))
    model.eval()
    return model

In [22]:
def infer(model, image):
    model = load_model(model, load_path=r"saved_best_models\efficientnet_best_model.pth")
    image = image.to(device)
    with torch.no_grad():
        output = model(image.unsqueeze(0))
        _, predicted = torch.max(output, 1)
    return predicted.item()

In [23]:
def load_image(image_path):
    image = Image.open(image_path).convert('RGB')
    image = data_transforms['test'](image)
    return image

In [25]:
image_path = "cats_vs_dogs_mini_dataset\cats_set\cat.4001.jpg"
class_index = {value:key.split("_")[0] for key, value in train_dataset.dataset.class_to_idx.items()}
image_tensor = load_image(image_path)
predicted_label = infer(model, image_tensor)

print("Predicted Class:", class_index[predicted_label])

Predicted Class: cats
