In [None]:
import torch
from torch import nn
from torch.utils.data import DataLoader
from torchvision import datasets,transforms
import torch.optim as optim
from torch.optim import lr_scheduler
import copy
import os
import numpy as np
import pandas as pd
from PIL import Image
from math import ceil


phi_values = {"b0": (0.5, 224, 0.6)}

b_model = [
    [1, 16, 1, 1, 3],
    [6, 24, 2, 2, 3],
    [6, 40, 2, 2, 5],
    [6, 80, 3, 2, 3],
    [6, 112, 3, 1, 5],
    [6, 192, 4, 2, 5],
    [6, 320, 1, 1, 3],
]

class CNNBlk(nn.Module):
    def __init__(self, in_channels, out_channels, kernel, stride, padding, groups=1):
        super(CNNBlk, self).__init__()
        self.cnn = nn.Conv2d(in_channels,out_channels,kernel,stride,padding,groups=groups,bias=False)
        self.bn = nn.BatchNorm2d(out_channels)
        self.silu = nn.SiLU()  

    def forward(self, x):
        return self.silu(self.bn(self.cnn(x)))

class SqueezeExcitation(nn.Module):
    def __init__(self, in_channels, re_dim):
        super(SqueezeExcitation, self).__init__()
        self.se = nn.Sequential(
            nn.AdaptiveAvgPool2d(1), 
            nn.Conv2d(in_channels, re_dim, 1),
            nn.SiLU(),
            nn.Conv2d(re_dim, in_channels, 1),
            nn.Sigmoid(),
        )

    def forward(self, x):
        return x * self.se(x)

# Residue blk

class Block(nn.Module):
    def __init__(self,in_channels,out_channels,kernel,stride,padding,expand_ratio,reduction=4,survival_prob=0.8,):
        super(Block, self).__init__()
        self.sur_prob = 0.8
        self.use_res = in_channels == out_channels and stride == 1
        hid_dim = in_channels * expand_ratio
        self.expand = in_channels != hid_dim
        re_dim = int(in_channels / reduction)

        if self.expand:
            self.expand_conv = CNNBlk(in_channels,hid_dim,kernel=3,stride=1,padding=1)

        self.conv = nn.Sequential(
            CNNBlk(hid_dim,hid_dim,kernel,stride,padding,groups=hid_dim),
            SqueezeExcitation(hid_dim, re_dim),
            nn.Conv2d(hid_dim, out_channels, 1, bias=False),
            nn.BatchNorm2d(out_channels),
        )

    def s_depth(self, x):
        if not self.training:
            return x
        bin_tensor = (torch.rand(x.shape[0], 1, 1, 1, device=x.device) < self.sur_prob)
        return torch.div(x, self.sur_prob) * bin_tensor

    def forward(self, inputs):
        x = self.expand_conv(inputs) if self.expand else inputs
        if self.use_res:
            return self.s_depth(self.conv(x)) + inputs
        else:
            return self.conv(x)
        
# Implementing EfficientNet
class Net(nn.Module):
    def __init__(self, num_classes):
        super(Net, self).__init__()
        width, depth, dropout_rate = self.cal()
        last = ceil(1280 * width)
        self.pool = nn.AdaptiveAvgPool2d(1)
        self.features = self.create_ft(width, depth, last)
        self.classifier = nn.Sequential(
            nn.Dropout(dropout_rate),
            nn.Linear(last, num_classes),
        )

    def cal(self,alpha=1.2, beta=1.1):
        phi, res, drop_rate = phi_values['b0']
        depth = alpha**phi
        width = beta**phi
        return width, depth, drop_rate

    def create_ft(self, width, depth, last):
        channels = int(32 * width)
        features = [CNNBlk(3, channels, 3, stride=2, padding=1)]
        in_channels = channels

        for expand_ratio, channels, repeats, stride, kernel in b_model:
            out_channels = 4 * ceil(int(channels * width) / 4)
            layers_repeats = ceil(repeats * depth)
            for layer in range(layers_repeats):
                features.append(
                    Block(in_channels,out_channels,expand_ratio=expand_ratio,stride=stride if layer == 0 else 1,kernel=kernel,
                    padding=kernel // 2, 
                    )
                )
                in_channels = out_channels

        features.append(CNNBlk(in_channels, last, kernel=1, stride=1, padding=0))
        return nn.Sequential(*features)
    
    def forward(self, x):
        x = self.pool(self.features(x))
        return self.classifier(x.view(x.shape[0], -1))

In [None]:
import gc
gc.collect()
torch.cuda.empty_cache()

In [None]:
# Data augmentation
data_transforms = {
    'train': transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.RandomRotation(8),
        transforms.ColorJitter(.2,.2,.2),
        transforms.RandomHorizontalFlip(p=0.5),
        transforms.RandomGrayscale(p=0.1),
#         transforms.RandomVerticalFlip(p=0.5),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
        ]),
    'val': transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ])
}

# def balance_classes(images, classes):
#     count_per_class = [0] * classes
#     for _, image_class in images:
#         count_per_class[image_class] += 1
#     weight_per_class = [0.] * classes
#     for i in range(classes):
#         weight_per_class[i] = 1 / float(count_per_class[i])
#     return weight_per_class


data_dir = '/kaggle/input/aist4010-spring2023-a1/data/'

image_datasets = {x: datasets.ImageFolder(os.path.join(data_dir,x ),data_transforms[x]) for x in ['train', 'val']}
# weights = {x: balance_classes(image_datasets[x].imgs, len(image_datasets[x].classes)) for x in ['train', 'val']}                                                                
# weights = {x: torch.DoubleTensor(weights[x]) for x in ['train', 'val']} 
dataset_sizes = {x: len(image_datasets[x]) for x in ['train', 'val']}

# sampler = {x: torch.utils.data.sampler.WeightedRandomSampler(weights[x], 6000) for x in ['train', 'val']}

# dataloaders = {x: DataLoader(image_datasets[x],batch_size=4,sampler=sampler[x],num_workers=2) for x in ['train', 'val']}
dataloaders = {x: DataLoader(image_datasets[x],batch_size=96,shuffle=True,num_workers=2) for x in ['train', 'val']}

class_names = image_datasets['train'].classes
if torch.cuda.is_available(): 
     device = "cuda" 
else: 
     device = "cpu"

In [None]:
# class EarlyStopper:
#     def __init__(self, patience=1, min_delta=0):
#         self.patience = patience
#         self.min_delta = min_delta
#         self.counter = 0
#         self.min_validation_loss = np.inf

#     def early_stop(self, validation_loss):
#         if validation_loss < self.min_validation_loss:
#             self.min_validation_loss = validation_loss
#             self.counter = 0
#         elif validation_loss > (self.min_validation_loss + self.min_delta):
#             self.counter += 1
#             if self.counter >= self.patience:
#                 return True
#         return False

In [None]:
def train_model(model, criterion, optimizer,scheduler, num_epochs=10):
    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0
    for epoch in range(num_epochs):
        print(f'Epoch {epoch+1}/{num_epochs}')
        print('-' * 10)

        for phase in ['train', 'val']:
            if phase == 'train':
                model.train() 
            else:
                model.eval()   

            running_loss = 0.0
            running_corrects = 0
            correct = 0
            total = 0
            for inputs, labels in dataloaders[phase]:   
                inputs = inputs.to(device)
                labels = labels.to(device)
                optimizer.zero_grad()
    
                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    _, preds = torch.max(outputs.data, 1)
                    loss = criterion(outputs, labels)
                    if phase == 'train':
                        loss.backward()
                        optimizer.step()
                    total += labels.size(0)
                    correct += (preds == labels).sum().item()
                epoch_acc = correct / total
                running_loss += loss.item() * inputs.size(0)
                
            if phase =='train':
                scheduler.step()
                
            epoch_loss = running_loss / dataset_sizes[phase]

            print(f'{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')

                
            if phase == 'val' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_wts = copy.deepcopy(model.state_dict())

    
        print()
        
    print(f'Best val Acc: {best_acc:4f}')

    torch.save(best_model_wts, "./trained_res.pt")
    model.load_state_dict(best_model_wts)
    return model

In [None]:
if __name__ == '__main__':            
    model_ft = Net(num_classes=1000)
    model_ft = model_ft.to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer_ft = optim.SGD(model_ft.parameters(), lr=0.1, momentum = 0.9)
    lr_scheduler = lr_scheduler.StepLR(optimizer_ft, step_size = 20, gamma=0.2)
    model_eff = train_model(model_ft, criterion, optimizer_ft, lr_scheduler, num_epochs=80)
    
    folder = '/kaggle/input/aist4010-spring2023-a1/data/test'
    transform = transforms.Compose([transforms.Resize((224, 224)),
                                    transforms.ToTensor(),
                                    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])])
    
    id_list = []
    pred_list = []
    label = []
    idx_to_class = {v:k for k, v in image_datasets['train'].class_to_idx.items()}
    with torch.no_grad():
        for filename in os.listdir(folder):
            img = Image.open((os.path.join(folder,filename)))
            image = img.convert("RGB")
            tensor = transform(img)
            tensor = tensor.cuda()
            tensor = tensor.unsqueeze(0)
            prediction = model_eff(tensor)
            _, predicted = torch.max(prediction.data, 1)
            label = idx_to_class[predicted.cpu().numpy()[0]]
            pred_list.append(label)
            id_list.append(filename)
    data = {"id":id_list, "label":pred_list}
    df = pd.DataFrame(data)
    df.to_csv('submission.csv',index=False)