In [1]:
import torch.nn as nn
import torch.optim as optim
from torch.nn import functional as F
from torch.utils.data import DataLoader, random_split
import pytorch_lightning as pl
import torchvision.transforms as transforms
import torchvision.datasets as datasets
import os
import torch


# LOAD DATA

In [2]:
def load_data(batch_size,img_size):
    augmentation = transforms.Compose([
        transforms.Resize((img_size, img_size)),
        transforms.ToTensor(),
    ])

    # Load the dataset
    data_path = "../../nature_12K/inaturalist_12K/"
    train_dataset = datasets.ImageFolder(os.path.join(data_path, 'train'), transform = augmentation)
    test_dataset = datasets.ImageFolder(os.path.join(data_path, 'val'), transform = augmentation)

    labels = train_dataset.classes
    trainset, valset = random_split(train_dataset, [8000, 1999])

    train_loader = DataLoader(trainset, batch_size = batch_size)
    val_loader = DataLoader(valset, batch_size = batch_size)
    test_loader = DataLoader(test_dataset, batch_size = batch_size)

    return labels , train_loader, val_loader, test_loader



# **Question - 1**

# **CONVOLUTIONAL NEURAL NETWOK CLASS**

In [6]:
from typing import Any


class ConvolutionalNeuralNetwork(nn.Module):
    def __init__(self, PARAM) -> None:
        super().__init__()
        self.flatten = nn.Flatten()
        self.filter_org = PARAM["filter_org"]
        self.filter_num = PARAM["filter_num"]
        self.activation = PARAM["activation"]
        self.con_layers = PARAM["con_layers"]
        self.den_layers = PARAM["dense_layers"]
        self.input_channel = PARAM["input_channel"]
        self.filter_num_list = self.organize_filters(self.filter_org, self.filter_num, self.con_layers)
        self.filter_size_list = PARAM["filter_size"]
        self.act = self.activation_fun(PARAM["activation"])
        self.output_act = self.activation_fun(PARAM["output_activation"])
        self.padding = PARAM["padding"]
        self.stride = PARAM["stride"]
        self.pool_padding = PARAM["pool_padding"]
        self.pool_stride = PARAM["pool_stride"]
        self.dense_output_list = PARAM["dense_output_list"]
        self.image_size = PARAM["image_size"]
        self.pool_filter_size = PARAM["pool_filter_size"]
        self.create_con_layers(self.input_channel, self.filter_size_list, self.dense_output_list, self.filter_num_list, self.act, self.pool_filter_size, self.output_act, self.image_size)
        
        
    def create_con_layers(self, input_channel, filter_size_list, dense_output_list, filter_num_list, act, pool_filter_size, output_act, image_size):
        self.layers = nn.ModuleList()
        computation = 0
        for i in range(1, self.con_layers+1):
            comp = 0
            layer = nn.Sequential(nn.Conv2d(input_channel, filter_num_list[i-1], filter_size_list[i-1], padding=self.padding, stride=self.stride), act, nn.MaxPool2d(pool_filter_size, padding=self.pool_padding, stride=self.pool_stride))
            
            image_size = (image_size - filter_size_list[i-1] + 2 * self.padding)//self.stride + 1
            
            comp = ((filter_size_list[i-1] ** 2) * input_channel * (image_size ** 2)*filter_num_list[i-1] + filter_num_list[i-1])
            computation += comp
            image_size = (image_size + 2 * self.pool_padding-(1*(pool_filter_size-1))-1)//self.pool_stride + 1
            # print(image_size)
            # print(comp)
            input_channel = filter_num_list[i-1]
            self.layers.append(layer)
        dense_input = filter_num_list[self.con_layers-1] * (image_size ** 2)
        for i in range(1, self.den_layers+1):
            comp = 0
            layer = nn.Sequential(nn.Linear(dense_input, dense_output_list[i-1]), act)
            comp = ((dense_input  + 1) * dense_output_list[i-1])
            computation += comp
            dense_input = dense_output_list[i-1]
            self.layers.append(layer)
            # print(computation)
            # print(comp)
        layer = nn.Sequential(nn.Linear(dense_input, 10), output_act)
        comp = ((dense_input  + 1) * 10)
        computation += comp
        # print(comp)
        print("Computation :: ", computation)
        self.layers.append(layer)


    def organize_filters(self, filter_org, filter_number, layers):
        if filter_org == "same":
            filter_num = [filter_number] * layers
        elif filter_org == "double":
            filter_num = [filter_number * (2 ** i) for i in range(layers)]
        elif filter_org == "half":
            filter_num = [int(filter_number * (2 ** (-i))) for i in range(layers)]
        return filter_num
    
    
    def activation_fun(self, act):
        if act == "ReLU":
            act_fun =nn.ReLU()
        elif act == "GELU":
            act_fun = nn.GELU()
        elif act == "SiLU":
            act_fun = nn.SiLU()
        elif act == "Mish":
            act_fun = nn.Mish()
        elif act == "softmax":
            act_fun = nn.Softmax(dim=1)
        elif act == "ELU":
            act_fun = nn.ELU()
        return act_fun
    
    
    def forward(self,x):
        for i in range(0, self.con_layers):
            x = self.layers[i](x)
        x = self.flatten(x)
        for i in range(0, self.den_layers):
            x = self.layers[i+self.con_layers](x)
        x = self.layers[self.con_layers + self.den_layers](x)
        return x   

    



In [7]:
PARAM = {
    "con_layers" : 5,
    "dense_layers" : 1,
    "filter_size" : [3] * 5,
    "output_activation" : "softmax", 
    "dense_output_list" : [32],
    "filter_num" : 16,
    "activation" : "Mish",
    "filter_org" : "half", #double half
    "input_channel" : 3,
    "padding" : 0,
    "stride" : 1,
    "pool_padding" : 0,
    "pool_stride" : 1,
    "image_size" : 256,
    "pool_filter_size" : 3
}

net = ConvolutionalNeuralNetwork(PARAM)
print("***********")
total = 0
for p in net.parameters():
    print(p.numel())
    total += p.numel()



print("Total Parameters", total)


Computation ::  124318385
***********
432
16
1152
8
288
4
72
2
18
1
1782272
32
320
10
Total Parameters 1784627


In [10]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(net.parameters(), lr = 0.0001)
epochs = 10
labels, train_loader, val_loader, test_loader = load_data(256, PARAM["image_size"])
for epoch in range(epochs):
    net.train()
    running_loss = 0.0
    correct = 0
    total = 0
    
    count = 0
    print("image is loading")
    for images, labels in train_loader:
        print("image is loaded")
        optimizer.zero_grad()
        outputs = net(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
        # print(outputs)
        
        # Calculate accuracy
        _, predicted = torch.max(outputs.data, 1)
        print(_, predicted)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
        count += 1
        break
    
    print(f"Epoch {epoch+1}, Loss: {running_loss/len(train_loader)}, Accuracy: {100 * correct / total}%")


image is loading
image is loaded
tensor([0.1526, 0.1525, 0.1529, 0.1530, 0.1525, 0.1524, 0.1526, 0.1522, 0.1521,
        0.1526, 0.1523, 0.1519, 0.1525, 0.1530, 0.1527, 0.1530, 0.1527, 0.1524,
        0.1529, 0.1527, 0.1528, 0.1529, 0.1526, 0.1533, 0.1530, 0.1528, 0.1526,
        0.1529, 0.1522, 0.1523, 0.1524, 0.1529, 0.1525, 0.1523, 0.1522, 0.1522,
        0.1523, 0.1525, 0.1530, 0.1519, 0.1525, 0.1521, 0.1523, 0.1523, 0.1527,
        0.1529, 0.1524, 0.1531, 0.1528, 0.1527, 0.1525, 0.1522, 0.1526, 0.1532,
        0.1522, 0.1526, 0.1526, 0.1524, 0.1521, 0.1524, 0.1527, 0.1524, 0.1528,
        0.1523, 0.1526, 0.1528, 0.1523, 0.1526, 0.1524, 0.1524, 0.1527, 0.1525,
        0.1525, 0.1524, 0.1527, 0.1526, 0.1524, 0.1524, 0.1528, 0.1525, 0.1527,
        0.1523, 0.1525, 0.1522, 0.1530, 0.1524, 0.1530, 0.1524, 0.1521, 0.1522,
        0.1523, 0.1526, 0.1528, 0.1525, 0.1529, 0.1526, 0.1526, 0.1526, 0.1526,
        0.1523, 0.1529, 0.1524, 0.1525, 0.1524, 0.1523, 0.1535, 0.1529, 0.1528,
       

In [8]:
API_KEY = "57566fbb0e091de2e298a4320d872f9a2b200d12"

In [16]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

cpu
