In [1]:
import torch
from torch.utils.data import Dataset,DataLoader
from tqdm import tqdm
from torchvision import transforms
import numpy as np
import pandas as pd
from PIL import Image
import argparse
import os
import copy
import torch
import cv2
from skimage.feature import hog

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
#performing transformation
LABELS_Severity = {35: 0,
                   43: 0,
                   47: 1,
                   53: 1,
                   61: 2,
                   65: 2,
                   71: 2,
                   85: 2}


mean = (.1706)
std = (.2112)
normalize = transforms.Normalize(mean=mean, std=std)

transform = transforms.Compose([
    transforms.Resize(size=(224,224)),
    transforms.ToTensor(),
    lambda x : x.expand(3,*x.shape[1:]),
    normalize,
])

In [3]:
class OCTDataset(Dataset):
    def __init__(self, args, subset='train', transform=None,):
        if subset == 'train':
            self.annot = pd.read_csv(args.annot_train_prime)
        elif subset == 'test':
            self.annot = pd.read_csv(args.annot_test_prime)
            
        self.annot['Severity_Label'] = [LABELS_Severity[drss] for drss in copy.deepcopy(self.annot['DRSS'].values)] 
        # print(self.annot)
        self.root = os.path.expanduser(args.data_root)
        self.transform = transform
        # self.subset = subset
        self.nb_classes=len(np.unique(list(LABELS_Severity.values())))
        self.path_list = self.annot['File_Path'].values
        self._labels = self.annot['Severity_Label'].values
        assert len(self.path_list) == len(self._labels)
        # idx_each_class = [[] for i in range(self.nb_classes)]

    def __getitem__(self, index):
        img, target = Image.open(self.root+self.path_list[index]).convert("L"), self._labels[index]

        if self.transform is not None:
            img = self.transform(img)

        return img, target

    def __len__(self):
        return len(self._labels)         

In [4]:
#creating a class to load all the required data
class NotebookArgs:
    def __init__(self, annot_train_prime = 'df_prime_train.csv', annot_test_prime = 'df_prime_test.csv', data_root = '/storage/home/hpaceice1/shared-classes/materials/ece8803fml/'):
        self.annot_train_prime = annot_train_prime
        self.annot_test_prime = annot_test_prime
        self.data_root = data_root
args = NotebookArgs()

In [5]:
#loading the test and train set
trainset = OCTDataset(args, 'train', transform=transform)
testset = OCTDataset(args, 'test', transform=transform)
batch_size = 32
train_loader = DataLoader(trainset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(testset, batch_size=batch_size, shuffle=False)

In [6]:
#loading images and labels from the loader
def get_X_y_from_loader(loader):
    X, y = [], []
    for sample in tqdm(loader, total=len(loader)):
        images, labels = sample[0], sample[1]
        X.extend([a.numpy()[0] for a in images])
        y.extend([a.numpy().flatten() for a in labels])
    return X,y

In [7]:
X_train, y_train = get_X_y_from_loader(train_loader)

100%|██████████| 758/758 [01:19<00:00,  9.54it/s]


In [8]:
#Applying HOG feature extractor on the images
X_train_hog = []
for i in tqdm(range(len(X_train))):
    hog_features = hog(X_train[i], pixels_per_cell=(8, 8),
                              cells_per_block=(2, 2), orientations=2, block_norm='L2-Hys',
                              feature_vector=True)
    X_train_hog.append(hog_features)
print(X_train_hog[0].shape)
print(len(X_train_hog))

100%|██████████| 24252/24252 [09:10<00:00, 44.02it/s]

(5832,)
24252





In [9]:
X_test, y_test = get_X_y_from_loader(test_loader)

100%|██████████| 250/250 [00:25<00:00,  9.65it/s]


In [10]:
X_subset = X_train_hog
y_subset = y_train
y_subset = [i[0] for i in y_subset]
print(len(y_train))

24252


In [11]:
X_test_hog = []
for i in tqdm(range(len(X_test))):
    #X_hog = X_train[i].reshape((224,224))
    hog_features = hog(X_test[i], pixels_per_cell=(8, 8),
                              cells_per_block=(2, 2), orientations=2, block_norm='L2-Hys',
                              feature_vector=True)
    X_test_hog.append(hog_features)
print(X_test_hog[0].shape)
print(len(X_test_hog))

100%|██████████| 7987/7987 [03:01<00:00, 44.03it/s]

(5832,)
7987





In [12]:
y_test = [i[0] for i in y_test]

In [13]:
#applying feed forward network
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader

# Define the number of input features
input_dim = len(X_subset[0])

# Define the number of output classes
num_classes = 3

# Convert the training and test sets to PyTorch tensors
X_train_tensor = torch.Tensor(X_subset)
y_train_tensor = torch.LongTensor(y_subset)
X_test_tensor = torch.Tensor(X_test_hog)
y_test_tensor = torch.LongTensor(y_test)

# Define the model architecture
model = nn.Sequential(
    nn.Linear(input_dim, 1024),
    nn.ReLU(),
    nn.Linear(1024,512),
    nn.ReLU(),
    nn.Linear(512,256),
    nn.ReLU(),
    nn.Linear(256,128),
    nn.ReLU(),
    nn.Linear(128,64),
    nn.ReLU(),
    nn.Linear(64,32),
    nn.ReLU(),
    nn.Linear(32,16),
    nn.ReLU(),
    nn.Linear(16, 8),
    nn.ReLU(),
    nn.Linear(8, num_classes),
    nn.Softmax(dim=1)
)

# Define the loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters())

# Define the training data loader
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)

# Train the model
num_epochs = 10
for epoch in range(num_epochs):
    for batch_idx, (data, target) in enumerate(train_loader):
        optimizer.zero_grad()
        output = model(data)
        loss = criterion(output, target)
        loss.backward()
        optimizer.step()
    print('Epoch:', epoch+1, 'Loss:', loss.item())

# Evaluate the model on the test data
with torch.no_grad():
    output = model(X_test_tensor)
    _, predicted = torch.max(output.data, 1)
    accuracy = (predicted == y_test_tensor).sum().item() / len(y_test)
    print('Test accuracy:', accuracy)


  X_train_tensor = torch.Tensor(X_subset)


Epoch: 1 Loss: 1.0046941041946411
Epoch: 2 Loss: 1.0673706531524658
Epoch: 3 Loss: 1.0759786367416382
Epoch: 4 Loss: 1.034498929977417
Epoch: 5 Loss: 0.9557835459709167
Epoch: 6 Loss: 0.9149312376976013
Epoch: 7 Loss: 1.0358635187149048
Epoch: 8 Loss: 1.0407699346542358
Epoch: 9 Loss: 1.0144894123077393
Epoch: 10 Loss: 1.0372117757797241
Test accuracy: 0.4464755227244272


In [14]:
#printing all the required output
from sklearn.metrics import precision_score, recall_score, balanced_accuracy_score,f1_score

with torch.no_grad():
    output = model(X_test_tensor)
    _, predicted = torch.max(output.data, 1)
    predicted = predicted.cpu()
    accuracy = (predicted == y_test_tensor).sum().item() / len(y_test_tensor)
    print(f'Test accuracy on {len(predicted)} points is {accuracy}')
    f1 = f1_score(y_test_tensor.cpu().numpy(), predicted.numpy(), average='weighted')
    precision = precision_score(y_test_tensor.cpu().numpy(), predicted.numpy(), average='weighted')
    recall = recall_score(y_test_tensor.cpu().numpy(), predicted.numpy(), average='weighted')
    balanced_accuracy = balanced_accuracy_score(y_test_tensor.cpu().numpy(), predicted.numpy())
    print(f'f1 score on {len(predicted)} points is {f1}')
    print(f'Test precision on {len(predicted)} points is {precision}')
    print(f'Test recall on {len(predicted)} points is {recall}')
    print(f'Test balanced accuracy on {len(predicted)} points is {balanced_accuracy}')

Test accuracy on 7987 points is 0.4464755227244272
f1 score on 7987 points is 0.4003987378161625
Test precision on 7987 points is 0.3629972810573795
Test recall on 7987 points is 0.4464755227244272
Test balanced accuracy on 7987 points is 0.3528649921507065


  _warn_prf(average, modifier, msg_start, len(result))
