In [110]:
import torch
import os
import pandas as pd
import numpy as np
import torch.nn as nn
import torch.optim as optim
import torch.torch_version
import torchvision.transforms as transforms
import matplotlib.pyplot as plt
from PIL import Image
from torch.utils.data import DataLoader, Dataset
from torchsummary import summary
from torchvision import transforms
from torchmetrics import Accuracy, Precision, Recall

In [111]:
device = (
    "cuda"
    if torch.cuda.is_available()
    else "mps"
    if torch.backends.mps.is_available()
    else "cpu"
)
print(f"Using {device} device")

Using cuda device


In [112]:
# HyperParameters
deimention = 50
batch_size = 64

In [113]:
class CustomDataset(Dataset):
    def __init__(self, labels_file, imgs_dir, transform=None):
        self.labels = pd.read_csv(labels_file)
        self.imgs_dir = imgs_dir
        self.transform = transform
        self.mean_of_color_channels = None  # Initialize as None
        self.std_of_color_channels = None   # Initialize as None
        self._calculate_stats()

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        img_path = os.path.join(self.imgs_dir, self.labels.iloc[idx, 0])
        image = Image.open(img_path).convert("RGB")
        label = self.labels.iloc[idx, 1]

        # Calculate mean and standard deviation if not already done
        if self.mean_of_color_channels is None or self.std_of_color_channels is None:
            self._calculate_stats()

        # Apply transformation with calculated statistics
        if self.transform:
            image = self.transform(image)

        return image, label

    def _calculate_stats(self):
        # Calculate mean and standard deviation across all images in the dataset
        # (consider using a random subset for efficiency with large datasets)
        channels_sum, channels_squared_sum = np.zeros(3), np.zeros(3)
        for idx in range(len(self)):
            img_path = os.path.join(self.imgs_dir, self.labels.iloc[idx, 0])
            image = np.asarray(Image.open(img_path).convert("RGB"))
            # Convert to float for calculations
            image = image.astype(np.float32)

            # Update channel sums and squared sums
            channels_sum += np.sum(image, axis=(0, 1))
            channels_squared_sum += np.sum(image**2, axis=(0, 1))

        # Calculate mean and standard deviation
        num_images = len(self)
        self.mean_of_color_channels = channels_sum / (num_images * image.shape[0] * image.shape[1])
        self.std_of_color_channels = np.sqrt(channels_squared_sum / (num_images * image.shape[0] * image.shape[1]) - self.mean_of_color_channels**2)


In [114]:
# temp_dataset_for_color_value_extraction = CustomDataset('data/Train/Train.csv', 'data/Train/')
# print(temp_dataset_for_color_value_extraction.mean_of_color_channels, temp_dataset_for_color_value_extraction.std_of_color_channels)

In [115]:
mean_of_color_channels = tuple(item / 255 for item in [131.8612, 121.9361, 116.0298])
std_of_color_channels = tuple(item / 255 for item in [88.6042, 87.0262, 87.3879]) # TODO: there might be an issue here


train_original_transform = transforms.Compose([ 
    transforms.Resize((deimention, deimention)),
    transforms.ToTensor(),
    transforms.Normalize(mean_of_color_channels, std_of_color_channels),
])

train_transform_modified = transforms.Compose([ 
    transforms.Resize((deimention, deimention)),
    transforms.RandomRotation(20),
    transforms.RandomHorizontalFlip(1),
    transforms.ToTensor(),
    transforms.Normalize(mean_of_color_channels, std_of_color_channels),
])

test_transform = transforms.Compose([
    transforms.Resize((deimention, deimention)),
    transforms.ToTensor(),
    transforms.Normalize(mean_of_color_channels, std_of_color_channels),
])

In [116]:
training_dataset_original = CustomDataset('data/Train/Train.csv', 'data/Train/', transform=train_original_transform)
training_dataset_modified = CustomDataset('data/Train/Train.csv', 'data/Train/', transform=train_transform_modified)
validation_dataset = CustomDataset('data/Valid/Validation.csv', 'data/Valid/', transform=test_transform)
testing_dataset = CustomDataset('data/Test/Test.csv', 'data/Test/', transform=test_transform)

In [117]:
training_loader_original = DataLoader(training_dataset_original, batch_size=batch_size, shuffle=True)
training_loader_modified = DataLoader(training_dataset_modified, batch_size=batch_size, shuffle=True)

validation_loader = DataLoader(validation_dataset, batch_size=batch_size, shuffle=False)
testing_loader = DataLoader(testing_dataset, batch_size=batch_size, shuffle=False)

In [118]:
class Model(nn.Module):
    def __init__(self,num_classes,device,dim = 32,num_epochs = 20,learning_rate = 0.001):
        super().__init__()
        self.num_of_classes = num_classes
        self.device = device
        self.dim = dim
        # Debugging
        self.DEBUG = False
        # Hyperparameters
        self.num_epochs = num_epochs
        self.learning_rate = learning_rate

        # History while Training
        self.model_loss_history = []
        self.model_train_acc_history = []
        self.model_val_acc_history = []
        self.model_val_precision_history = []
        self.model_val_recall_history = []
        self.model_lr_history = []

        # Model Attributes
        self.criterion = nn.CrossEntropyLoss()
        self.optimizer = None
        self.accuracy = Accuracy(task= 'multiclass', num_classes=self.num_of_classes, average='macro').to(self.device)
        self.precision = Precision(task= 'multiclass', num_classes=self.num_of_classes, average='macro').to(self.device)
        self.recall = Recall(task= 'multiclass', num_classes=self.num_of_classes, average='macro').to(self.device)
        # Model Architecture
        self.feature_extract = nn.Sequential(
            nn.Conv2d(in_channels=3, out_channels=32, kernel_size=5, padding=2),
            nn.MaxPool2d(kernel_size=2),
            nn.Flatten(),
        )
        self.classifier = nn.Sequential(            
            nn.Linear(20000, self.num_of_classes),
        )
        
    def forward(self, x):
        x = self.feature_extract(x)
        x = self.classifier(x)
        return x
    
    def predict(self, img):
        '''
        returns the predicted classes for the given images
        '''
        self.eval()
        with torch.no_grad():
            img = img.to(self.device)
            output = self(img)
            _, predicted = torch.max(output, 1)
            return predicted
        

    
    def eval_val(self, data_loader):
        '''
        returns accuracy, precision and recall
        '''
        self.eval()
        with torch.no_grad():
            for images, labels in data_loader:
                
                images, labels = images.to(self.device), labels.to(self.device)
                outputs = self(images)
                self.accuracy(outputs, labels)
                self.precision(outputs, labels)
                self.recall(outputs, labels)

        return self.accuracy.compute(), self.precision.compute(), self.recall.compute()
    
    def train_model(self, train_loader, val_loader):
        
        last_accuracy = -100
        self.optimizer = optim.Adam(self.parameters(), lr=self.learning_rate)

        for epoch in range(self.num_epochs):
            self.train()
            running_loss = 0.0

            for i, (images, labels) in enumerate(train_loader):

                images, labels = images.to(self.device), labels.to(self.device)
                self.optimizer.zero_grad()
                outputs = self(images)
                loss = self.criterion(outputs, labels)
                loss.backward()
                self.optimizer.step()

                running_loss += loss.item()
                if i%100 == 0 and self.DEBUG:
                    print(" Step [{}/{}] Loss: {}".format(i, len(train_loader), loss.item()))
                    
            val_acc, val_precision, val_recall = self.eval_val(val_loader)
            train_acc, _, _ = self.eval_val(train_loader)

            self.model_loss_history.append(running_loss/len(train_loader))
            self.model_train_acc_history.append(train_acc.item())
            self.model_val_acc_history.append(val_acc.item())
            self.model_val_precision_history.append(val_precision.item())
            self.model_val_recall_history.append(val_recall.item())
            self.model_lr_history.append(self.optimizer.param_groups[0]['lr'])
            
            print(f'Epoch: {epoch+1}/{self.num_epochs}, Loss: {loss.item()},Train Acc: {train_acc}, Val Acc: {val_acc}, Val Precision: {val_precision}, Val Recall: {val_recall}')
            
            if val_acc > last_accuracy:
                last_accuracy = val_acc
            else:
                break
        
        self.save_model()
        print('Finished Training')

    def plot_history(self):
        # making two plots one for loss and other for accuracy
        fig, axs = plt.subplots(2, 3, figsize=(15, 10))
        fig.suptitle('Model Training History')
        axs[0, 0].plot(self.model_loss_history)
        axs[0, 0].set_title('Model Loss')
        axs[0, 0].set_xlabel('Epochs')
        axs[0, 0].set_ylabel('Loss')

        axs[0, 1].plot(self.model_train_acc_history, label='Train')
        axs[0, 1].plot(self.model_val_acc_history, label='Val')
        axs[0, 1].set_title('Model Accuracy')
        axs[0, 1].set_xlabel('Epochs')
        axs[0, 1].set_ylabel('Accuracy')
        axs[0, 1].legend()

        axs[1, 0].plot(self.model_val_precision_history)
        axs[1, 0].set_title('Model Precision')
        axs[1, 0].set_xlabel('Epochs')
        axs[1, 0].set_ylabel('Precision')
        
        axs[1, 1].plot(self.model_val_recall_history)
        axs[1, 1].set_title('Model Recall')
        axs[1, 1].set_xlabel('Epochs')
        axs[1, 1].set_ylabel('Recall')

        axs[0, 2].plot(self.model_lr_history)
        axs[0, 2].set_title('Learning Rate')
        axs[0, 2].set_xlabel('Epochs')
        axs[0, 2].set_ylabel('Learning Rate')
        
        
        # axs[1, 2].axis('off')

        plt.show()
    
    def save_model(self):
        torch.save(self.state_dict(),type(self).__name__+'.pth')

    def print_summary(self):
        summary(self, (3, self.dim, self.dim))


In [119]:
num_of_classes = 2
model_1 = Model(num_classes=num_of_classes, device=device, dim=deimention, num_epochs=500, learning_rate=0.001)
model_1.to(device)
model_1.print_summary()

----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1           [-1, 32, 50, 50]           2,432
         MaxPool2d-2           [-1, 32, 25, 25]               0
           Flatten-3                [-1, 20000]               0
            Linear-4                    [-1, 2]          40,002
Total params: 42,434
Trainable params: 42,434
Non-trainable params: 0
----------------------------------------------------------------
Input size (MB): 0.03
Forward/backward pass size (MB): 0.92
Params size (MB): 0.16
Estimated Total Size (MB): 1.11
----------------------------------------------------------------


In [120]:
model_1.train_model(training_loader_original, validation_loader)

Epoch: 1/500, Loss: 0.2639616131782532,Train Acc: 0.9053245782852173, Val Acc: 0.8902382254600525, Val Precision: 0.8784343004226685, Val Recall: 0.8902382254600525
Epoch: 2/500, Loss: 0.09624283760786057,Train Acc: 0.9143646955490112, Val Acc: 0.9017543792724609, Val Precision: 0.8920964002609253, Val Recall: 0.9017543792724609
Epoch: 3/500, Loss: 0.11315222829580307,Train Acc: 0.9283357858657837, Val Acc: 0.9131679534912109, Val Precision: 0.9077234268188477, Val Recall: 0.9131679534912109
Epoch: 4/500, Loss: 0.21034590899944305,Train Acc: 0.9352672100067139, Val Acc: 0.9256393909454346, Val Precision: 0.9208154678344727, Val Recall: 0.9256393909454346
Epoch: 5/500, Loss: 0.06211989000439644,Train Acc: 0.9385992288589478, Val Acc: 0.9326939582824707, Val Precision: 0.9304695129394531, Val Recall: 0.9326939582824707
Epoch: 6/500, Loss: 0.030699271708726883,Train Acc: 0.9425690174102783, Val Acc: 0.9365450143814087, Val Precision: 0.9363681674003601, Val Recall: 0.9365450143814087
Epoc

In [121]:
acc, prec, rec = model_1.eval_val(testing_loader)
print(f'Accuracy: {acc}, Precision: {prec}, Recall: {rec}')

Accuracy: 0.9790127277374268, Precision: 0.9795866012573242, Recall: 0.9790127277374268


In [ ]:
# model_1.train_model(training_loader_modified, validation_loader)

In [ ]:
# acc, prec, rec = model_1.eval_val(testing_loader)
# print(f'Accuracy: {acc}, Precision: {prec}, Recall: {rec}')