In [1]:
import pandas as pd
import torch
import torch.nn as nn
import torchvision.models as models
import torchvision.transforms as transforms
from torch.utils.data import DataLoader, Dataset
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score
import os
from PIL import Image
import numpy as np

from tqdm import tqdm
import time

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

ModuleNotFoundError: No module named 'torchvision'

### Class to load the images

In [None]:
class ImageProvider():
    def __init__(self, image_paths):
        
        self.images = []
        for path in image_paths:

            with Image.open(path) as img:
                img = img.convert('RGB')
                self.images.append(img)
    
    def getImages(self):

        return self.images

### Class to convert the images to correct size 

In [None]:
class CustomDataset(Dataset):
    def __init__(self, images, transform=None):
        self.images = images
        self.transform = transform

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        image = self.images[idx]
        
        # Convert numpy array to PIL Image
        image = Image.fromarray(image)

        if self.transform:
            image = self.transform(image)

        return image

### Data class for actual training and testing

In [None]:
class Data(Dataset):
    def __init__(self,data):

        n = data.shape[1]
        self.features = torch.tensor(data.iloc[:, 0:n-1].values.astype(np.int64), dtype=torch.float32)
        self.labels = torch.tensor(data.iloc[:, -1].values.astype(np.int64), dtype=torch.int64)

    def __getitem__(self, index):
        return self.features[index], self.labels[index]

    def __len__(self):
        return len(self.features)

### Clasifier model class

In [None]:
class Classifier(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, dropout:float = 0.3):
        super().__init__()

        self.layer1 = self.layer1 = nn.Sequential(
            nn.Linear(in_features=input_size, out_features=hidden_size),
            nn.ReLU(),
            nn.BatchNorm1d(hidden_size),
            nn.Dropout(dropout),
        )
        
        self.output_layer = nn.Linear(in_features=hidden_size, out_features=output_size)

    def forward(self, x):
        
        x = self.layer1(x)
        x = self.output_layer(x)

        return x

### Alexnet model parameters

In [None]:
# Load pre-trained AlexNet model
alexnet_model = models.alexnet(pretrained=True)
alexnet_model.eval()  # Set the model to evaluation mode

# Remove the classification layer of AlexNet
alexnet_model = nn.Sequential(*list(alexnet_model.children())[:-1])

# Freeze the parameters of the feature extractor
for param in alexnet_model.parameters():
    param.requires_grad = False

### Converting images to features

In [None]:
image_paths = [...]  
provider = ImageProvider(image_paths)

reqd_images = provider.getImages()

# Define transformations for preprocessing images
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor()
])

labels = [...]

full_dataset = CustomDataset(reqd_images, transform=transform)
full_loader = DataLoader(full_dataset, batch_size=32, shuffle=False)

In [None]:
# Extract features for all images
all_features = []

with torch.no_grad():
    for images in full_loader:
        features = alexnet_model(images)
        all_features.append(features.squeeze())

# Concatenate features into a single tensor
all_features = torch.cat(all_features)

### Train Test Split to train model

In [None]:
X_train, X_test, y_train, y_test = train_test_split(all_features.numpy(), labels, test_size=0.2, random_state=42)

In [None]:
X_train = pd.DataFrame(X_train)
Y_train = pd.DataFrame(y_train)
X_test = pd.DataFrame(X_test)
Y_test = pd.DataFrame(y_test)

X_train = pd.concat([X_train, Y_train], axis=1)
X_test = pd.concat([X_test, Y_test], axis=1)

In [None]:
train_dataset = Data(data=X_train)
test_dataset = Data(data=X_test)

train_dataloader = DataLoader(dataset=train_dataset, batch_size=128, shuffle=True)
test_dataloader = DataLoader(dataset=test_dataset, batch_size=128, shuffle=True)

### Parameters of the model

In [None]:
input_size = all_features.size(1)
hidden_size = 256  # Adjust this as needed
num_classes = 10

classifier = Classifier(input_size, hidden_size, num_classes)

# Define loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(classifier.parameters(), lr=0.001)

sanity_check=False
n_epochs = 15

### Helper functions for Training and Validation

In [None]:
def train_epoch(model, dataloader, optimiser, criterion):
    model.train()

    for batch in tqdm(dataloader):
        x, y = batch[0], batch[1]

        output = model(x)
        output = nn.Softmax(dim=-1)(output)
        loss = criterion(output, y)

        optimiser.zero_grad()
        loss.backward()
        optimiser.step()

        if sanity_check:
            break

In [None]:
def validate(
    model,
    dataloader,
    criterion
):
    model.eval()
    total_loss = 0
    predictions = []
    truths = []

    with torch.no_grad():
        for batch in tqdm(dataloader):
            x, y = batch[0], batch[1]

            output = model(x)
            output = nn.Softmax(dim=-1)(output)
            loss = criterion(output, y)
            total_loss += loss.detach().cpu().item()/len(dataloader)

            preds = torch.argmax(output, dim=-1)
            predictions.extend(preds.cpu())
            truths.extend(y.cpu())

            if sanity_check:
                break

    acc = accuracy_score(y_true=truths, y_pred=predictions)
    f1 = f1_score(y_true=truths, y_pred=predictions, average='macro')

    return total_loss, acc, f1

In [None]:
def train_model(
    model,
    train_dataloader,
    test_dataloader,
    optimiser,
    criterion
):
    for epoch in range(1, n_epochs+1):
        start_time = time.time()

        print(f"========= EPOCH {epoch} STARTED =========")
        train_epoch(model=model, dataloader=train_dataloader, optimiser=optimiser, criterion=criterion)

        print(f"========= TRAIN EVALUATION STARTED =========")
        train_val_op = validate(model=model, dataloader=train_dataloader, criterion=criterion)

        print(f"========= TEST EVALUATION STARTED =========")
        test_val_op = validate(model=model, dataloader=test_dataloader, criterion=criterion)

        print(f"END OF {epoch} EPOCH")
        print(f"| Time taken: {time.time() - start_time: 7.3f} |")
        print(f"| Train Loss: {train_val_op[0]: 7.3f} | Train acc: {train_val_op[1]: 1.5f} | Train f1: {train_val_op[2]: 1.5f} |")
        print(f"| Test Loss: {test_val_op[0]: 7.3f}  | Test acc: {test_val_op[1]: 1.5f}  | Test f1: {test_val_op[2]: 1.5f}  |")

        if sanity_check:
            break

### Train the model and report accuracies

In [None]:
train_model(
    model=classifier,
    train_dataloader=train_dataloader,
    test_dataloader=test_dataloader,
    optimiser=optimizer
)