In [1]:
import torch
import torchvision
import numpy as np
import matplotlib.pyplot as plt

from sklearn import metrics
from elpv_dataset.utils.elpv_reader import load_dataset

from torchvision.transforms import v2
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn

In [2]:
transform = v2.Compose([
    v2.RandomVerticalFlip(0.5),
    v2.RandomHorizontalFlip(0.5),
])

transform_base = v2.Compose([
    # v2.Grayscale(num_output_channels=3),
    v2.ToImage(),
    v2.ToDtype(torch.float32, scale=True),
    v2.Resize((24,24), antialias=None),
])

In [3]:
class ELPV_Dataset(Dataset):
    def __init__(self, transform=None):
        images, labels, types = load_dataset()
        self.images = np.reshape(images, (np.shape(images)[0], np.shape(images)[1], np.shape(images)[2], 1))
        
        self.labels = (labels*3).astype(np.uint8)
        self.types = types
        self.transform = transform
        
    def __getitem__(self, index):
        image = self.images[index]
        label = self.labels[index]

        if self.transform:
            
            image = self.transform(image)
            
        return image, label

    def __len__(self):
        return (len(self.images))

data = ELPV_Dataset(transform_base)

In [4]:
batch_size = 128

val_size = int(len(data)*(1/4))
train_size = len(data) - val_size
print(f"Training Data: {train_size} Validation Data: {val_size}")

train_data, val_data = torch.utils.data.random_split(data,[train_size,val_size])

label_tensor = torch.Tensor([label for (image, label) in train_data]).int()
train_sampler = torch.utils.data.sampler.WeightedRandomSampler([(1/torch.bincount(label_tensor)[label].item()) for (image, label) in train_data],len(train_data))
label_tensor = torch.Tensor([label for (image, label) in val_data]).int()
val_sampler = torch.utils.data.sampler.WeightedRandomSampler([(1/torch.bincount(label_tensor)[label].item()) for (image, label) in val_data],len(val_data))

train_dl = DataLoader(train_data, batch_size=batch_size, sampler=train_sampler)
val_dl = DataLoader(val_data, batch_size=batch_size, shuffle=True)

# for images, labels in val_dl:
#     print(torch.bincount(labels))

Training Data: 1968 Validation Data: 656


In [5]:
# i = np.random.randint(1,200)
print(np.shape(data[0][0]))
# print(data[i][0])
# # images, labels, types = load_dataset()

# # print(transform(images[0]))
# plt.imshow(data[i][0].permute(1,2,0))
# plt.show()

torch.Size([1, 24, 24])


In [6]:
class VGG7(nn.Module):
    def __init__(self):
        super().__init__()
        self.network = nn.Sequential(
            nn.Conv2d(1, 1, (3,3), padding=1),
            nn.BatchNorm2d(1),
            nn.ReLU(),
            nn.Conv2d(1, 1, (3,3), padding=1),
            nn.BatchNorm2d(1),
            nn.ReLU(),

            nn.MaxPool2d(2,2),
            
            nn.Conv2d(1, 2, (3,3), padding=1),
            nn.BatchNorm2d(2),
            nn.ReLU(),
            nn.Conv2d(2, 2, (3,3), padding=1),
            nn.BatchNorm2d(2),
            nn.ReLU(),

            nn.MaxPool2d(2,2),
            
            nn.Conv2d(2, 512, (3,3), padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(),
            nn.Conv2d(512, 256, (3,3), padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(),
            nn.Conv2d(256, 4, (3,3), padding=1),
            nn.BatchNorm2d(4),
            nn.ReLU(),

            nn.MaxPool2d(12,12),
            
            nn.Flatten(),

            nn.Linear(4,4),
            nn.LogSoftmax(-1),
        )
    def forward(self, x):
        output = self.network(x)
        return output

class ResNet18(nn.Module):
    def __init__(self):
        super().__init__()
        multiplier = 4
        self.input = nn.Sequential(
            nn.Conv2d(1, 1*multiplier, (3,3), padding=1),
            nn.ReLU(),
        )
        self.block1 = self.block(1*multiplier,1*multiplier)
        self.relu1 = nn.ReLU()
        self.block2 = self.block(1*multiplier,1*multiplier)
        self.relu2 = nn.ReLU()
        self.maxpool1 = nn.MaxPool2d(2,2)
        self.block3 = self.block(1*multiplier,2*multiplier)
        self.relu3 = nn.ReLU()
        self.block4 = self.block(2*multiplier,2*multiplier)
        self.relu4 = nn.ReLU()
        self.maxpool2 = nn.MaxPool2d(2,2)
        self.block5 = self.block(2*multiplier,4*multiplier)
        self.relu5 = nn.ReLU()
        self.block6 = self.block(4*multiplier,4*multiplier)
        self.relu6 = nn.ReLU()
        self.maxpool3 = nn.MaxPool2d(2,2)
        self.block7 = self.block(4*multiplier,8*multiplier)
        self.relu7 = nn.ReLU()
        self.block8 = self.block(8*multiplier,8*multiplier)
        self.relu8 = nn.ReLU()
        self.globalmaxpool = nn.AvgPool2d(2,2)
        self.flatten = nn.Flatten()
        self.fullyconnected = nn.Linear(8*multiplier, 4)
        self.logsoftmax = nn.LogSoftmax(-1)
        
        
        self.dropout = nn.Dropout2d(0)
 
    def block(self, input_channels, output_channels):
        output = nn.Sequential(
            nn.Conv2d(input_channels, output_channels, (3,3), padding=1),
            nn.BatchNorm2d(output_channels),
            nn.ReLU(),
            nn.Conv2d(output_channels, output_channels, (3,3), padding=1),
            nn.BatchNorm2d(output_channels),
        )
        return output

    def process_block(self, input, block, sample=None):
        result = block(input)

        if sample:
            input = self.sample(input)
        output = (input + result)

        return output

    def sample(self, input):
        in_c = int(input.size()[1])
        out_c = int(input.size()[1]*2)
        upsample = nn.Conv2d(in_c, out_c, (3,3), padding=1)
        upsample.to('cuda')
        output = upsample(input)
        return output
        
    def forward(self,x):
        x = self.input(x)
        x = self.process_block(x, self.block1)
        x = self.relu1(x)
        x = self.process_block(x, self.block2)
        x = self.relu2(x)
        x = self.maxpool1(x)
        x = self.process_block(x, self.block3, True)
        x = self.relu3(x)
        x = self.process_block(x, self.block4)
        x = self.relu4(x)
        x = self.maxpool2(x)
        x = self.process_block(x, self.block5, True)
        x = self.relu5(x)
        x = self.process_block(x, self.block6)
        x = self.relu6(x)
        x = self.maxpool3(x)
        x = self.process_block(x, self.block7, True)
        x = self.relu7(x)
        x = self.process_block(x, self.block8)
        x = self.relu8(x)
        x = self.globalmaxpool(x)
        x = self.flatten(x)
        x = self.fullyconnected(x)
        x = self.logsoftmax(x)

        return x

model = ResNet18()
model = model.to('cuda')

In [7]:
model
     

ResNet18(
  (input): Sequential(
    (0): Conv2d(1, 4, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU()
  )
  (block1): Sequential(
    (0): Conv2d(4, 4, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): BatchNorm2d(4, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
    (3): Conv2d(4, 4, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (4): BatchNorm2d(4, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  )
  (relu1): ReLU()
  (block2): Sequential(
    (0): Conv2d(4, 4, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): BatchNorm2d(4, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
    (3): Conv2d(4, 4, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (4): BatchNorm2d(4, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  )
  (relu2): ReLU()
  (maxpool1): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (block3): Seque

In [8]:
num_epochs = 5000

loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters())

history = []

for epoch in range(num_epochs+1):
    model.train()

    loss_sum = 0
    batch_count = 0
    result = 0
    
    for i, (images, labels) in enumerate(train_dl):
        images = transform(images)
        images = images.to('cuda')
        labels = labels.to('cuda')
        
        outputs = model(images)
        losses = loss_fn(outputs, labels)
        optimizer.zero_grad()
        losses.backward()
        optimizer.step()

        predictions = (torch.argmax(outputs, dim=1))
        results = predictions - labels
        result+= len(np.where(results.cpu() == 0)[0])

        loss_sum+= losses.item()
        batch_count+=1

    train_loss = round((loss_sum/batch_count),4)
    train_accuracy = round((result/(batch_count*batch_size))*100,2)

    model.eval()
    
    loss_sum = 0
    batch_count = 0
    result = 0
        
    for i, (images, labels) in enumerate(val_dl):
        images = images.to('cuda')
        labels = labels.to('cuda')
 
        outputs = model(images)
        losses = loss_fn(outputs, labels)
        predictions = (torch.argmax(outputs, dim=1))
        results = predictions - labels
        result+= len(np.where(results.cpu() == 0)[0])

        loss_sum+= losses.item()
        batch_count+=1

    val_loss = round((loss_sum/batch_count),4)
    val_accuracy = round((result/(batch_count*batch_size))*100,2)

    history.append({'train_loss':train_loss,'train_acc':train_accuracy,'val_loss':val_loss,'val_acc':val_accuracy})

    print("Epoch [{}]/[{}], train_loss: {:4f}, train_acc: {:.2f}%, val_loss: {:4f}, val_acc: {:.2f}%".format(
        epoch, num_epochs, train_loss, train_accuracy, val_loss, val_accuracy))


Epoch [0]/[5000], train_loss: 1.391600, train_acc: 26.71%, val_loss: 1.446200, val_acc: 10.29%
Epoch [1]/[5000], train_loss: 1.307600, train_acc: 34.33%, val_loss: 1.448900, val_acc: 10.29%
Epoch [2]/[5000], train_loss: 1.268500, train_acc: 37.35%, val_loss: 1.478900, val_acc: 24.35%
Epoch [3]/[5000], train_loss: 1.277000, train_acc: 36.62%, val_loss: 1.331200, val_acc: 43.75%
Epoch [4]/[5000], train_loss: 1.271800, train_acc: 38.33%, val_loss: 1.307900, val_acc: 35.16%
Epoch [5]/[5000], train_loss: 1.294500, train_acc: 35.84%, val_loss: 1.246500, val_acc: 38.67%
Epoch [6]/[5000], train_loss: 1.242300, train_acc: 40.82%, val_loss: 1.261000, val_acc: 31.77%
Epoch [7]/[5000], train_loss: 1.234500, train_acc: 40.04%, val_loss: 1.191500, val_acc: 32.16%
Epoch [8]/[5000], train_loss: 1.246100, train_acc: 40.04%, val_loss: 1.557000, val_acc: 26.43%
Epoch [9]/[5000], train_loss: 1.236900, train_acc: 40.43%, val_loss: 1.640400, val_acc: 24.09%
Epoch [10]/[5000], train_loss: 1.224500, train_acc

KeyboardInterrupt: 

In [None]:
def plot_losses(history):
    """ Plot the losses in each epoch"""
    train_losses = [x.get('train_loss') for x in history]
    val_losses = [x['val_loss'] for x in history]
    plt.plot(train_losses, '-b')
    plt.plot(val_losses, '-r')
    plt.xlabel('epoch')
    plt.ylabel('loss')
    plt.legend(['Training', 'Validation'])
    plt.title('Loss vs. No. of epochs VGG');

plot_losses(history)

In [None]:
def plot_accuracies(history):
    """ Plot the history of accuracies"""
    train_acc = [x['train_acc'] for x in history]
    val_acc = [x['val_acc'] for x in history]
    plt.plot(train_acc, '-b')
    plt.plot(val_acc, '-r')
    plt.xlabel('epoch')
    plt.ylabel('accuracy')
    plt.legend(['Training', 'Validation'])
    plt.title('Accuracy vs. No. of epochs');
    
plot_accuracies(history)

In [None]:
model.eval()
model = model.to('cuda')

label_list = []
prediction_list = []

data = ELPV_Dataset(transform_base)
dataloader = DataLoader(data, batch_size=1, shuffle=False)

for image, label in val_data:
    # print(image)
    image = torch.reshape(image, (1, np.shape(image)[0], np.shape(image)[1], np.shape(image)[2]))
    image = image.to('cuda')
    result = model(image)
    result = result.to('cpu')
    
    prediction_list.append(torch.argmax(result).item())
    label_list.append(label.item())
    # print(result.item())

# print((label_list))
# print((prediction_list))

print(metrics.classification_report(label_list, prediction_list))
disp = metrics.ConfusionMatrixDisplay.from_predictions(label_list, prediction_list)
disp.figure_.suptitle("Confusion Matrix")
plt.show()
    