In [None]:
from google.colab import drive
drive.mount("/content/gdrive")

In [51]:
import os
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import models, transforms
from torch.utils.data import Dataset, DataLoader, random_split, WeightedRandomSampler
import numpy as np
from PIL import Image
from torch.optim import lr_scheduler
import copy
class CustomDataset(Dataset):
    def __init__(self, img_dir, label_file, transform=None):
        self.img_dir = img_dir
        # Directly listing files without sorting
        self.img_paths = [os.path.join(img_dir, f) for f in os.listdir(img_dir) if f.endswith('.jpg') or f.endswith('.png')]
        self.labels = self.load_labels(label_file)
        self.transform = transform

    def load_labels(self, label_file):
        with open(label_file, 'r') as file:
            labels = [int(line.strip()) - 1 for line in file.readlines()]  # Ensure labels are zero-indexed
        return labels

    def __len__(self):
        return len(self.img_paths)

    def __getitem__(self, idx):
        image = Image.open(self.img_paths[idx]).convert('RGB')
        label = self.labels[idx]
        if self.transform:
            image = self.transform(image)
        return image, label

def calculate_weights(dataset):
    labels = [label for _, label in dataset]
    c_counts = np.zeros(3)
    for label in labels:
        c_counts[label] += 1
    weights = np.zeros(len(labels))
    for i, label in enumerate(labels):
        weights[i] = len(labels) / (3.0 * c_counts[label])
    return torch.DoubleTensor(weights)








In [52]:
# Data transformations
data_transforms = {
    'train': transforms.Compose([
        transforms.Resize(224),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406],
                            std=[0.229, 0.224, 0.225])
    ]),
    'test': transforms.Compose([
        transforms.Resize(224),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406],
                             std=[0.229, 0.224, 0.225])
    ])
}

# Directory setup
data_dir = "/content/gdrive/My Drive/dataset"
train_dataset = CustomDataset(
    img_dir=os.path.join(data_dir, 'training'),
    label_file=os.path.join(data_dir, 'training_labels.txt'),
    transform=data_transforms['train']
)

test_dataset = CustomDataset(
    img_dir=os.path.join(data_dir, 'test'),
    label_file=os.path.join(data_dir, 'test_labels.txt'),
    transform=data_transforms['test']
)

# Split train dataset into train and validation
train_size = int(0.8 * len(train_dataset))
val_size = len(train_dataset) - train_size
train_dataset, val_dataset = random_split(train_dataset, [train_size, val_size])

# Dataloaders with sampler for the training set
weights = {
    'train': calculate_weights(train_dataset),
    'val': calculate_weights(val_dataset),
    'test': calculate_weights(test_dataset)
}


samplers = {
    x: WeightedRandomSampler(weights[x], len(weights[x]))
    for x in ['train', 'val', 'test']
}



In [53]:
# Model setup
model_conv = models.alexnet(pretrained=True)
for param in model_conv.parameters():
    param.requires_grad = False



no_classes = 3
#set the last layer, to be 3-classed
model_conv.classifier[6] = nn.Linear(model_conv.classifier[6].in_features, no_classes)

criterion = nn.CrossEntropyLoss()
optimizer_conv = optim.SGD(model_conv.parameters(), lr=0.001, momentum=0.9)
exp_lr_scheduler = lr_scheduler.StepLR(optimizer_conv, step_size=1, gamma=0.1)
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")


model_conv = model_conv.to(device)




In [54]:
def train_model(model, criterion, optimizer, scheduler, num_epochs, device):
    dataloaders = {
    'train': DataLoader(train_dataset, batch_size=4, sampler=samplers['train'], shuffle=False),
    'val': DataLoader(val_dataset, batch_size=4,  shuffle=False),
    'test': DataLoader(test_dataset, batch_size=4,  shuffle=False)
}
    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0  # Initialize the best accuracy

    for epoch in range(num_epochs):
        print(f'Epoch {epoch+1}/{num_epochs}')
        print('-' * 10)

        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()  # Set model to training mode
            else:
                model.eval()   # Set model to evaluate mode

            running_loss = 0.0
            running_corrects = 0

            for inputs, labels in dataloaders[phase]:
                inputs = inputs.to(device)
                labels = labels.to(device)
                optimizer.zero_grad()

                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    _, preds = torch.max(outputs, 1)
                    loss = criterion(outputs, labels)

                    if phase == 'train':
                        loss.backward()
                        optimizer.step()

                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)

            epoch_loss = running_loss / len(dataloaders[phase].dataset)
            epoch_acc = running_corrects.double() / len(dataloaders[phase].dataset)
            print(f'{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')

            if phase == 'val' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_wts = copy.deepcopy(model.state_dict())  # Update the best model weights

        scheduler.step()

    print('Best validation Acc: {:4f}'.format(best_acc))

    # load best model weights
    model.load_state_dict(best_model_wts)
    return model

In [55]:


# Train and evaluate the model
model_conv = train_model(model_conv, criterion, optimizer_conv, exp_lr_scheduler, 25,device)


Epoch 1/25
----------
train Loss: 0.9878 Acc: 0.6757
val Loss: 1.4480 Acc: 0.7105
Epoch 2/25
----------
train Loss: 0.7708 Acc: 0.8176
val Loss: 0.3364 Acc: 0.8947
Epoch 3/25
----------
train Loss: 0.2284 Acc: 0.9324
val Loss: 0.3056 Acc: 0.8947
Epoch 4/25
----------
train Loss: 0.2890 Acc: 0.9392
val Loss: 0.3055 Acc: 0.8947
Epoch 5/25
----------
train Loss: 0.4320 Acc: 0.8716
val Loss: 0.3050 Acc: 0.8947
Epoch 6/25
----------
train Loss: 0.2203 Acc: 0.9324
val Loss: 0.3050 Acc: 0.8947
Epoch 7/25
----------
train Loss: 0.2413 Acc: 0.9122
val Loss: 0.3050 Acc: 0.8947
Epoch 8/25
----------
train Loss: 0.2798 Acc: 0.9189
val Loss: 0.3050 Acc: 0.8947
Epoch 9/25
----------
train Loss: 0.4476 Acc: 0.8716
val Loss: 0.3050 Acc: 0.8947
Epoch 10/25
----------
train Loss: 0.2435 Acc: 0.9054
val Loss: 0.3050 Acc: 0.8947
Epoch 11/25
----------
train Loss: 0.1972 Acc: 0.9189
val Loss: 0.3050 Acc: 0.8947
Epoch 12/25
----------
train Loss: 0.1218 Acc: 0.9595
val Loss: 0.3050 Acc: 0.8947
Epoch 13/25
-

In [56]:
model_conv.eval()

dataloaders = {
    'train': DataLoader(train_dataset, batch_size=4, shuffle=False),
    'val': DataLoader(val_dataset, batch_size=4, shuffle=False),
    'test': DataLoader(test_dataset, batch_size=4,  shuffle=False)
}

In [57]:
confusion_matrix = torch.zeros(3, 3)
with torch.no_grad():
  for x in ['train', 'test', 'val']:
    for i, (inputs, classes) in enumerate(dataloaders[x]):
      inputs = inputs.to(device)
      classes = classes.to(device)
      outputs = model_conv(inputs)
      _, preds = torch.max(outputs, 1)
      for t, p in zip(classes.view(-1), preds.view(-1)):
        confusion_matrix[t.long(), p.long()] += 1

    print(x)
    print(confusion_matrix)
    print(confusion_matrix.diag()/(confusion_matrix.sum(0) + confusion_matrix.sum(1) - confusion_matrix.diag()))
    print(confusion_matrix.diag().sum() / confusion_matrix.sum())

train
tensor([[43.,  2.,  2.],
        [ 1., 64.,  5.],
        [ 0.,  0., 31.]])
tensor([0.8958, 0.8889, 0.8158])
tensor(0.9324)
test
tensor([[ 84.,   5.,   6.],
        [  3., 111.,  13.],
        [  2.,   4.,  64.]])
tensor([0.8400, 0.8162, 0.7191])
tensor(0.8870)
val
tensor([[ 96.,   5.,   7.],
        [  3., 126.,  16.],
        [  2.,   4.,  71.]])
tensor([0.8496, 0.8182, 0.7100])
tensor(0.8879)
