In [None]:
import os
import shutil
from pathlib import Path
from glob import glob
from PIL import Image
import random
import numpy as np
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay

import torch
import torch.nn as nn
import torch.nn.functional as F
from torchmetrics import Accuracy
import torchvision
from torchvision import datasets, models, transforms
from torchvision.io import read_image
import pytorch_lightning as pl
from pytorch_lightning.loggers import TensorBoardLogger

In [None]:
def get_abs_path(n_parent: int = 0):
    return Path('../' * n_parent).resolve()

def split_dataset(data_path, train_path, validate_path, test_path):

    for class_dir in data_path.iterdir():
        class_images_paths = sorted(list(class_dir.iterdir()))[:]

        train_paths, test_paths = train_test_split( class_images_paths,
                                                    test_size=0.15,
                                                    shuffle=False,
                                                    random_state=42)

        test_paths, validate_paths = train_test_split(  test_paths,
                                                        test_size=0.30,
                                                        shuffle=False,
                                                        random_state=42)

        target_train_path = train_path / class_dir.name
        target_train_path.mkdir(exist_ok=True)
        target_validate_path = validate_path / class_dir.name
        target_validate_path.mkdir(exist_ok=True)
        target_test_path = test_path / class_dir.name
        target_test_path.mkdir(exist_ok=True)

        for train_image_path in train_paths:
            shutil.copy(train_image_path, target_train_path)
        for validate_image_path in validate_paths:
            shutil.copy(validate_image_path, target_validate_path)
        for test_image_path in test_paths:
            shutil.copy(test_image_path, target_test_path)

In [None]:
def imshow(img, title=''):
    img = img / 2 + 0.35
    npimg = img.numpy()
    plt.imshow(np.transpose(npimg, (1, 2, 0)))
    plt.title(title)
    plt.show()

def display_test(dataloaders, data_transforms, class_names):
    dataiter = iter(dataloaders['test'])
    ex_images, ex_labels = dataiter.next()
    # ex_images = data_transforms['train'](ex_images)

    for i in range(5):
        imshow(ex_images[i], title=class_names[ex_labels[i]])
        # plt.imshow(  ex_images[i].permute(1, 2, 0).tonumpy()  )

    plt.figure(figsize=(10,6))
    imshow(torchvision.utils.make_grid(ex_images))

In [None]:
path = get_abs_path(1)
data_path = path / 'data'
learning_path = path / 'learning_data/'
train_path = learning_path / 'train/'
validate_path = learning_path / 'validate/'
test_path = learning_path / 'test/'
train_path.mkdir(exist_ok=True, parents=True)
validate_path.mkdir(exist_ok=True, parents=True)
test_path.mkdir(exist_ok=True, parents=True)

split_dataset(data_path, train_path, validate_path, test_path)

In [None]:
batch_size = 16

data_transforms = {
    'train': transforms.Compose([
        transforms.RandomChoice([
            transforms.RandomHorizontalFlip(),
            transforms.RandomVerticalFlip(),
            transforms.RandomPerspective(),
            transforms.RandomRotation(45),
            transforms.RandomCrop((600, 600))
        ]),
        transforms.Resize((224,224)),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    'validate': transforms.Compose([
        transforms.Resize((224,224)),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    'test': transforms.Compose([
        transforms.Resize((224,224)),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
}

keys = ['train', 'validate', 'test']

image_datasets = {  x: datasets.ImageFolder(os.path.join(learning_path, x), data_transforms[x])
                    for x in keys}

dataloaders = { x: torch.utils.data.DataLoader( image_datasets[x],
                                                batch_size=batch_size,
                                                shuffle=True,
                                                num_workers=8)
                for x in keys}

print('Train samples count:', len(image_datasets['train']))
print('Validate samples count:', len(image_datasets['validate']))
print('Test samples count:', len(image_datasets['test']))

dataset_sizes = {x: len(image_datasets[x]) for x in keys}
class_names = image_datasets['train'].classes
class_names

In [None]:
display_test(dataloaders, data_transforms, class_names)

In [None]:
class MyClassifier(pl.LightningModule):
    def __init__(self, num_classes):
        super().__init__()

        self.model = models.resnet18(pretrained=True)
        # self.model.conv1 = torch.nn.Conv1d(3, 64, (3, 3), bias=False)
        self.model.fc = nn.Linear(in_features=self.model.fc.in_features, out_features=num_classes)

        # self.loss = nn.BCELoss()
        self.loss = nn.CrossEntropyLoss(weight=torch.Tensor([1.0, 1.1, 1.1, 1.0, 0.9]), reduction='mean')
        self.accuracy = Accuracy()

    def forward(self, x):
        return self.model(x)

    def training_step(self, batch, batch_idx):
        x, y = batch
        outputs = self(x)
        loss = self.loss(outputs, y)
        self.log('train_loss', loss)
        self.log('train_acc', self.accuracy(outputs, y), prog_bar=True)
        return loss

    def validation_step(self, batch, batch_idx):
        x, y = batch
        outputs = self(x)
        loss = self.loss(outputs, y)
        self.log('val_loss', loss)
        self.log('val_acc', self.accuracy(outputs, y), prog_bar=True)
        return loss

    def test_step(self, batch, batch_idx):
        x, y = batch
        outputs = self(x)
        loss = self.loss(outputs, y)
        self.log('test_loss', loss)
        self.log('test_acc', self.accuracy(outputs, y), prog_bar=True)
        return loss

    def configure_optimizers(self):
        optimizer = torch.optim.SGD(self.parameters(), lr=1e-3, momentum=0.9)
        # optimizer = torch.optim.Adam(self.parameters(), lr=1e-3)
        return optimizer

In [None]:
model = MyClassifier(num_classes=len(class_names))
# model_name = 'lightning_sgd_46.ckpt'
# if os.path.isfile(model_name):
#     model = model.load_from_checkpoint(checkpoint_path=model_name)

In [None]:
logger = TensorBoardLogger('runs', name='SGD e=35')
trainer = pl.Trainer(max_epochs=35, gpus=1, logger=logger)
trainer.fit(model, train_dataloaders=dataloaders['train'], val_dataloaders=dataloaders['validate'])

In [None]:
trainer.test(model, dataloaders=dataloaders['test'])

In [None]:
trainer.save_checkpoint('lightning_sgd_50.ckpt')

In [None]:
true = []
predictions = []
for ex_images, ex_labels in dataloaders['test']:
    results = model(ex_images)

    for i in range(0, len(ex_images)):
        pred_tensor = results[i]
        oryg_idx = ex_labels[i]
        img = ex_images[i]

        pred_idx = int(torch.argmax(pred_tensor))
        pred_name = class_names[pred_idx]
        oryg_name = class_names[oryg_idx]

        # title = 'true: {}, predicted: {}'.format(oryg_name, pred_name)
        # print(title)
        # imshow(img, title=title)
        true.append(oryg_idx)
        predictions.append(pred_idx)

In [None]:
cm = confusion_matrix(true, predictions)
disp = ConfusionMatrixDisplay(confusion_matrix=cm)
disp.plot()
plt.show()

In [None]:
def process_filename(filename, class_names):
    label_idx = -1
    label_name = ''
    for name in class_names:
        if name in filename:
            label_idx = class_names.index(name)
            label_name = name
    return label_idx, label_name

In [None]:
path_list = glob(os.path.join(path, 'learning_data\\test\\*', '*'))
random.shuffle(path_list)
# path_list = [path_list[0]] # if only one image - wrong output

images_list = []
true_labels = []
pred_labels = []

batch = torch.Tensor()
for filename in path_list:
    input_image = Image.open(filename)
    input_tensor = data_transforms['test'](input_image)
    input_batch = input_tensor.unsqueeze(0)
    batch = torch.cat([batch, input_batch], dim=0)

    idx, name = process_filename(filename, class_names)
    true_labels.append(name)
    images_list.append(input_image)

batch.size()
results = model(batch)

for result in results:
    probabilities = torch.nn.functional.softmax(result, dim=0)
    idx = int(torch.argmax(probabilities))
    name = class_names[idx]
    pred_labels.append(name)

In [None]:
# MANUAL TEST
images_count = int(input('Count of images:'))
i = 0
while i < images_count:
    image = images_list[i]
    image.show()
    reply = input()
    if reply != '':
        i -= 1
        continue

    title = 'true: {}, predicted: {}'.format(true_labels[i], pred_labels[i])
    print(title)
    i += 1