In [1]:
# Source : https://www.youtube.com/watch?v=fR_0o25kigM&list=PLhhyoLH6IjfxeoooqP9rhU3HJIAVAJ3Vz&index=20
# original Author : Alladin Persson
import torch
import torch.nn as nn 

from math import ceil
base_model = [
    # Expand ratio, channels, repeats, stride, kernel_size
    [1, 16, 1, 1, 3],
    [6, 24, 2, 2, 3],
    [6, 40, 2, 2, 5],
    [6, 80, 3, 2, 3],
    [6, 112, 3, 1, 5],
    [6, 192, 4, 2, 5],
    [6, 320, 1, 1, 3],
]

phi_values = {
    # tuple of : (phi_value, resolution, drop_rate)
    "b0": (0, 224, 0.2), #alpha, gamma, depth
    "b1": (0.5, 240, 0.2),
    "b2": (1, 260, 0.3),
    "b3": (2, 300, 0.3),
    "b4": (3, 380, 0.4),
    "b5": (4, 456, 0.4),
    "b6": (5, 528, 0.5),
    "b7": (6, 600, 0.5),
}

In [2]:
class CNNBlock(nn.Module):
    def __init__(
        self, in_channels, out_channels, kernel_size, stride, padding, groups=1
    ):
        super(CNNBlock, self).__init__()
        self.cnn = nn.Conv2d(
            in_channels,
            out_channels,
            kernel_size,
            stride,
            padding,
            groups=groups,
            bias=False,
        )
        self.bn = nn.BatchNorm2d(out_channels)
        self.silu = nn.SiLU()  # SiLU <-> Swish

    def forward(self, x):
        return self.silu(self.bn(self.cnn(x)))

In [3]:
class SqueezeExcitation(nn.Module):
    def __init__(self, in_channels, reduced_dim):
        super(SqueezeExcitation, self).__init__()
        
        self.se = nn.Sequential(
            nn.AdaptiveAvgPool2d(1), # Cx H x W -. C x 1 x 1
            nn.Conv2d(in_channels, reduced_dim, 1),
            nn.SiLU(),
            nn.Conv2d(reduced_dim, in_channels, 1), 
            nn.Sigmoid()
        )
        
    def forward(self, x):
        return x * self.se(x)

In [4]:
class InvertedResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size, stride, padding, expand_ratio, reduction = 4, survival_prob = 0.8):
        super(InvertedResidualBlock, self).__init__()
        self.survival_prob = survival_prob
        self.use_residual = in_channels == out_channels and stride ==1 
        hidden_dim = in_channels * expand_ratio
        self.expand = in_channels != hidden_dim
        reduced_dim = int(in_channels/reduction)
        
        if self.expand:
            self.expand_conv = CNNBlock(in_channels, hidden_dim, kernel_size=3, stride=1 , padding= 1)
        self.conv = nn.Sequential(CNNBlock(hidden_dim, hidden_dim, kernel_size, stride, padding, groups=hidden_dim),
                                  SqueezeExcitation(hidden_dim, reduced_dim),
                                  nn.Conv2d(hidden_dim, out_channels, 1, bias =False),
                                  nn.BatchNorm2d(out_channels))
    def stochastic_depth(self, x):
        if not self.training:
            return x
        binary_tensor = torch.rand(x.shape[0], 1, 1,1, device = x.device)< self.survival_prob
        return torch.div(x, self.survival_prob) * binary_tensor
    
    def forward(self, inputs):
        x = self.expand_conv(inputs) if self.expand else inputs
        
        if self.use_residual:
            return self.stochastic_depth(self.conv(x)) + inputs
        else:
            return self.conv(x)

In [5]:

class EfficientNet(nn.Module):
    def __init__(self, version, num_classes):
        super(EfficientNet, self).__init__()
        width_factor, depth_factor, dropout_rate = self.calculate_factors(version)
        last_channels = ceil(1280 * width_factor)
        self.pool = nn.AdaptiveAvgPool2d(1)
        self.features = self.create_features(width_factor, depth_factor, last_channels)
        self.classifier = nn.Sequential(
            nn.Dropout(dropout_rate),
            nn.Linear(last_channels, num_classes),
        )

    def calculate_factors(self, version, alpha=1.2, beta=1.1):
        phi, res, drop_rate = phi_values[version]
        depth_factor = alpha**phi
        width_factor = beta**phi
        return width_factor, depth_factor, drop_rate

    def create_features(self, width_factor, depth_factor, last_channels):
        channels = int(32 * width_factor)
        features = [CNNBlock(3, channels, 3, stride=2, padding=1)]
        in_channels = channels

        for expand_ratio, channels, repeats, stride, kernel_size in base_model:
            out_channels = 4 * ceil(int(channels * width_factor) / 4)
            layers_repeats = ceil(repeats * depth_factor)

            for layer in range(layers_repeats):
                features.append(
                    InvertedResidualBlock(
                        in_channels,
                        out_channels,
                        expand_ratio=expand_ratio,
                        stride=stride if layer == 0 else 1,
                        kernel_size=kernel_size,
                        padding=kernel_size // 2,  # if k=1:pad=0, k=3:pad=1, k=5:pad=2
                    )
                )
                in_channels = out_channels

        features.append(
            CNNBlock(in_channels, last_channels, kernel_size=1, stride=1, padding=0)
        )

        return nn.Sequential(*features)
    def forward(self, x):
        x = self.pool(self.features(x))
        return self.classifier(x.view(x.shape[0], -1))

In [6]:
def test():
    device = "cuda" if torch.cuda.is_available() else "cpu"
    version = "b0"
    phi, res, drop_rate = phi_values[version]
    num_examples, num_classes = 4, 10
    x = torch.randn((num_examples, 3, res, res)).to(device)
    model = EfficientNet(
        version=version,
        num_classes=num_classes,
    ).to(device)

    print(model(x).shape)  # (num_examples, num_classes)
test()

torch.Size([4, 10])


In [7]:
import os
from glob import glob 
import pandas as pd 

path_to_data = os.path.join("..", "cifar_data", "cifar-10-batches-py")
train_df =  pd.read_csv(os.path.join(path_to_data, "train.csv"))
test_df = pd.read_csv(os.path.join(path_to_data, "test.csv"))

In [8]:
train_df["label"].value_counts()

6    5000
9    5000
4    5000
1    5000
2    5000
7    5000
8    5000
3    5000
5    5000
0    5000
Name: label, dtype: int64

In [9]:
test_df["label"].value_counts()

3    1000
8    1000
0    1000
6    1000
1    1000
9    1000
5    1000
7    1000
4    1000
2    1000
Name: label, dtype: int64

In [10]:
import cv2
import torch
from numpy import array
from pandas import DataFrame
from torch.utils.data import Dataset, TensorDataset, DataLoader
from torchvision import transforms

def get_from_df_paths_targets( df: DataFrame, transform=None):
    """
    Get from dataframe paths and labels
    Args:
        df: Dataframe
        transform: transform
    Returns:
        df: Dataframe
        paths: paths
        labels: labels
        transform: transform
    """
    paths = df['path'].to_list()
    labels = df['label'].to_list()
    return df,paths, labels, transform

class CV2ImageDataset(Dataset):
    """
    OpenCV dataset to be used with albumentations
    """
    def __init__(self, df:DataFrame, transform: transforms =None, device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")):
        self.df, self.paths, self.labels, self.transform = get_from_df_paths_targets( df, transform=transform)
        self.device=device
    def __len__(self):
        return len(self.df)
    
    def __getitem__(self, idx):
        image = cv2.imread(self.paths[idx])
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        label = torch.tensor(int(self.labels[idx]))
        if self.transform is not None:
            image = self.transform(image=image)["image"]
        return image, label
    
    
class DatasetLoader():
    """
    Data Loader object
    """
    def __init__(self, dataset: Dataset, batch_size:int = 1 , num_workers: int =1):
        self.dataset = dataset
        self.batch_size = batch_size
        self.num_workers = num_workers
        #self.get_dataloader()
        
    def get_dataloader(self,):
        """
        Method to return dataloader
        Returns:
            self.loader: Dataset Loader
        """
        self.loader = DataLoader(
                    self.dataset,
                    batch_size=self.batch_size,
                    num_workers=self.num_workers,
                    shuffle=True
                )
        return self.loader
    
    def check_dataloader_dimension(self):
        """
        Prints out the dimension of dataloader
        """
        for _, (data, target) in enumerate(self.loader):
            print('Data Shape of Dataloader is (data, target) : ', data.shape, target.shape)
            print('Data Type of Dataloader is (data, target) : ', type(data), type(target))
            torch.cuda.empty_cache()
            break



In [11]:
import albumentations as A
from albumentations.pytorch import ToTensorV2
batch_size = 4
aug = A.Compose([   
A.Resize(224, 224),
A.HorizontalFlip(p=0.5),          
A.Normalize(),            
ToTensorV2()])

In [12]:
train_dataset = CV2ImageDataset(train_df, transform=aug)
test_dataset = CV2ImageDataset(test_df, transform=aug)

In [13]:
train_loader = DatasetLoader(train_dataset, batch_size, num_workers=0).get_dataloader()
test_loader = DatasetLoader(test_dataset, batch_size, num_workers=0).get_dataloader()

In [14]:
import torch.optim as optim
from torch.optim.lr_scheduler import CyclicLR
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
num_epochs = 10
model = EfficientNet(
        version="b0",
        num_classes=10,
    ).to(device)
optimizer = optim.SGD(model.parameters(), lr = 0.1)
scheduler = scheduler = CyclicLR(optimizer, base_lr=0.001, max_lr=0.1, step_size_up=2000)
criterion = nn.CrossEntropyLoss()

In [15]:
num_epochs = 10
for epoch in range(num_epochs):
    model.train()  # Set the model to training mode

    running_loss = 0.0
    correct_predictions = 0

    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)

        # Zero the parameter gradients
        optimizer.zero_grad()

        # Forward pass
        outputs = model(inputs)

        # Calculate loss
        loss = criterion(outputs, labels)

        # Backpropagation and optimization
        loss.backward()
        optimizer.step()

        # Update statistics
        running_loss += loss.item() * inputs.size(0)
        _, predicted = torch.max(outputs, 1)
        correct_predictions += (predicted == labels).sum().item()

    # Calculate epoch statistics
    epoch_loss = running_loss / len(train_dataset)
    epoch_accuracy = correct_predictions / len(train_dataset)

    print(f"Epoch [{epoch + 1}/{num_epochs}], Loss: {epoch_loss:.4f}, Accuracy: {epoch_accuracy:.4f}")

    scheduler.step()
    
    # Evaluation on the test set
    model.eval()  # Set the model to evaluation mode

    test_loss = 0.0
    test_correct_predictions = 0

    with torch.no_grad():
        for test_inputs, test_labels in test_loader:
            test_inputs, test_labels = test_inputs.to(device), test_labels.to(device)

            # Forward pass
            test_outputs = model(test_inputs)

            # Calculate test loss
            test_loss += criterion(test_outputs, test_labels).item() * test_inputs.size(0)

            # Calculate test accuracy
            _, test_predicted = torch.max(test_outputs, 1)
            test_correct_predictions += (test_predicted == test_labels).sum().item()

    # Calculate test set statistics
    test_loss /= len(test_dataset)
    test_accuracy = test_correct_predictions / len(test_dataset)

    print(f"Test Set - Loss: {test_loss:.4f}, Accuracy: {test_accuracy:.4f}")

print("Training finished.")

Epoch [1/10], Loss: 1.7337, Accuracy: 0.3763
Test Set - Loss: 1.0755, Accuracy: 0.6199
Epoch [2/10], Loss: 1.1946, Accuracy: 0.5956
Test Set - Loss: 0.7880, Accuracy: 0.7324
Epoch [3/10], Loss: 0.9508, Accuracy: 0.6908
Test Set - Loss: 0.6594, Accuracy: 0.7758
Epoch [4/10], Loss: 0.8050, Accuracy: 0.7375
Test Set - Loss: 0.5886, Accuracy: 0.8027
Epoch [5/10], Loss: 0.7049, Accuracy: 0.7705
Test Set - Loss: 0.5724, Accuracy: 0.8061
Epoch [6/10], Loss: 0.6263, Accuracy: 0.7962
Test Set - Loss: 0.4934, Accuracy: 0.8332
Epoch [7/10], Loss: 0.5672, Accuracy: 0.8160
Test Set - Loss: 0.4740, Accuracy: 0.8426
Epoch [8/10], Loss: 0.5078, Accuracy: 0.8339
Test Set - Loss: 0.4664, Accuracy: 0.8478
Epoch [9/10], Loss: 0.4671, Accuracy: 0.8493
Test Set - Loss: 0.4577, Accuracy: 0.8503
Epoch [10/10], Loss: 0.4277, Accuracy: 0.8612
Test Set - Loss: 0.4516, Accuracy: 0.8443
Training finished.
