<a href="https://colab.research.google.com/github/Strojove-uceni/2024-final-hlina-rules/blob/main/Resnet_a_custom_Data_preprocessing.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [28]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import models, transforms, datasets
from torch.utils.data import DataLoader

import torchvision.transforms as transforms
from PIL import Image
import random
from torch.utils.data import Dataset, DataLoader

import random
import os
from sklearn.preprocessing import LabelEncoder
import numpy as np


#Data preprocessing
class RandomRotate:
    def __init__(self):
        pass

    def __call__(self, image):
        transform = transforms.RandomRotation(degrees=(-10,10))
        return transform(image)


class RandomPerspectiveTransform:    #applied with the 0.1 probability
    def __init__(self):
        pass

    def __call__(self, image):
        transform = transforms.RandomPerspective(distortion_scale=0.5, p=1.0)
        if random.random() < 0.1:
            return transform(image)
        else:
            return image

class CenteredCrop:
    def __init__(self, crop_size=100, output_size=320):
        self.crop_size = crop_size
        self.output_size = output_size

    def __call__(self, image, nozzle_x, nozzle_y):
        # Calculate crop box boundaries
        left = max(nozzle_x - self.crop_size // 2, 0)
        upper = max(nozzle_y - self.crop_size // 2, 0)
        right = min(nozzle_x + self.crop_size // 2, image.width)
        lower = min(nozzle_y + self.crop_size // 2, image.height)

        cropped_img = image.crop((left, upper, right, lower))
        resized_img = cropped_img.resize((self.output_size, self.output_size), Image.Resampling.LANCZOS)

        return resized_img


class RandomCrop:
    def __init__(self, output_size=224):
        self.output_size = output_size

    def __call__(self, image):
        h, w = image.size[1], image.size[0]  # the image will be after 320x320 reshaping so they are equal
        scale_factor = random.uniform(0.9, 1.0)
        crop_size = int(h*scale_factor)
        max_top = h - crop_size
        max_left = w - crop_size
        top = random.randint(0, max_top)
        left = random.randint(0, max_left)
        cropped = image.crop((left, top, left + crop_size, top + crop_size))
        resized_cropped = cropped.resize((self.output_size, self.output_size), Image.Resampling.LANCZOS)
        return resized_cropped


class HorizontalFlipColorJitter:   #apply with probability 0.5
    def __init__(self):
        self.brightness = 0.1
        self.contrast = 0.1
        self.saturation = 0.1
        self.hue = 0.1

    def __call__(self, image):
        flipped = image.transpose(Image.FLIP_LEFT_RIGHT)
        color_jitter = transforms.ColorJitter(
            brightness = self.brightness,
            contrast= self.contrast,
            saturation=self.saturation,
            hue=self.hue
        )
        jittered_image = color_jitter(flipped)

        if random.random() < 0.5:
            return jittered_image
        else:
            return image


class TransformPipeline:
    def __init__(self, pipeline):
        self.pipeline = pipeline

    def __call__(self, image, **kwargs):
        for job in self.pipeline:
            if isinstance(job, CenteredCrop):
                image = job(image, kwargs['nozzle_x'], kwargs['nozzle_y'])
            else:
                image = job(image)
        return image


class CustomDataset(Dataset):
    def __init__(self, image_paths, labels, nozzle_coords, transform=None):
        self.image_paths = image_paths
        self.labels = labels
        self.nozzle_coords = nozzle_coords
        self.transform = transform

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        image_path = self.image_paths[idx]
        image = Image.open(image_path).convert('RGB')
        label = self.labels[idx]
        nozzle_x, nozzle_y = self.nozzle_coords[idx]

        if self.transform:
            if isinstance(self.transform, TransformPipeline):
                image = self.transform(image, nozzle_x=nozzle_x, nozzle_y=nozzle_y)
            else:
                image = self.transform(image)

        return image, label

# Prepare the train data
transform_pipeline = TransformPipeline([
    RandomRotate(),
    RandomPerspectiveTransform(),
    CenteredCrop(crop_size=100, output_size=320),
    RandomCrop(output_size=224),
    HorizontalFlipColorJitter(),
    transforms.ToTensor(),
])

path = '/content/image-6.jpg'

train_image_paths = [path]  # List of image paths
train_labels = [[1,1,1,1]] # List of train labels
train_nozzle_coords = [(531,554)]  # List of nozzle coordinates

train_dataset = CustomDataset(image_paths=train_image_paths, labels=train_labels, nozzle_coords=train_nozzle_coords, transform=transform_pipeline)

train_loader = DataLoader(train_dataset, batch_size=1, shuffle=True)

########################################################################################################

# PRETRAINED RESNET

########################################################################################################


# Load the pretrained ResNet model
resnet = models.resnet50(pretrained=True)

# Freeze the layers of the base model
for param in resnet.parameters():
    param.requires_grad = False

# Modify the final layers to be able to assess the four criteria
class CustomResNet(nn.Module):
    def __init__(self, base_model):
        super(CustomResNet, self).__init__()
        self.base_model = base_model
        self.base_model.fc = nn.Identity()  # Remove the original fully connected layer

        #output layer for each of the 4 criteria - three outputs for each criterion: 'high', 'low', 'good'
        self.fc1 = nn.Linear(2048, 3)
        self.fc2 = nn.Linear(2048, 3)
        self.fc3 = nn.Linear(2048, 3)
        self.fc4 = nn.Linear(2048, 3)

    def forward(self, x):
        x = self.base_model(x)
        out1 = self.fc1(x)
        out2 = self.fc2(x)
        out3 = self.fc3(x)
        out4 = self.fc4(x)
        return out1, out2, out3, out4

# Create instance of the custom model
model = CustomResNet(resnet)

lrn_rate = 0.001 #set learning rate

# loss function and optimizer
loss_f = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=lrn_rate)

# Training loop
num_epochs = 10
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for inputs, labels in train_loader:

        labels1, labels2, labels3, labels4 = labels

        optimizer.zero_grad()

        # Forward pass
        outputs1, outputs2, outputs3, outputs4 = model(inputs)

        # Compute the loss
        loss1 = loss_f(outputs1, labels1)
        loss2 = loss_f(outputs2, labels2)
        loss3 = loss_f(outputs3, labels3)
        loss4 = loss_f(outputs4, labels4)
        loss = loss1 + loss2 + loss3 + loss4

        # Backward pass and optimize
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(train_loader)}')

# Save the trained model
torch.save(model.state_dict(), 'custom_resnet.pth')






Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Epoch [1/10], Loss: 4.309817314147949
Epoch [2/10], Loss: 1.02295982837677
Epoch [3/10], Loss: 0.22139587998390198
Epoch [4/10], Loss: 0.06105317175388336
Epoch [5/10], Loss: 0.019830303266644478
Epoch [6/10], Loss: 0.007371401414275169
Epoch [7/10], Loss: 0.0029464815743267536
Epoch [8/10], Loss: 0.0016646489966660738
Epoch [9/10], Loss: 0.0007822939660400152
Epoch [10/10], Loss: 0.0004088669375050813


In [31]:
# Prepare the test data
to_tensor_pipeline = TransformPipeline([
    transforms.ToTensor()
])

test_image_paths = ['/content/image-15.jpg']  # List of test image paths
test_labels = [[1,1,1,1]]  # List of test labels for each criterion
test_nozzle_coords = [(531, 554)]  # List of test nozzle coordinates

test_dataset = CustomDataset(image_paths=test_image_paths, labels=test_labels, nozzle_coords=test_nozzle_coords, transform=to_tensor_pipeline)

test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False)

# Evaluation loop
correct1, correct2, correct3, correct4 = 0, 0, 0, 0
total = 0
with torch.no_grad():
    for inputs, labels in test_loader:
        #labels1, labels2, labels3, labels4 = labels[:, 0], labels[:, 1], labels[:, 2], labels[:, 3]
        labels1, labels2, labels3, labels4 = labels[0], labels[1], labels[2], labels[3]
        outputs1, outputs2, outputs3, outputs4 = model(inputs)
        _, predicted1 = torch.max(outputs1, 1)
        _, predicted2 = torch.max(outputs2, 1)
        _, predicted3 = torch.max(outputs3, 1)
        _, predicted4 = torch.max(outputs4, 1)
        total += labels1.size(0)
        correct1 += (predicted1 == labels1).sum().item()
        correct2 += (predicted2 == labels2).sum().item()
        correct3 += (predicted3 == labels3).sum().item()
        correct4 += (predicted4 == labels4).sum().item()

print(f'Accuracy for criterion 1: {100 * correct1 / total}%')
print(f'Accuracy for criterion 2: {100 * correct2 / total}%')
print(f'Accuracy for criterion 3: {100 * correct3 / total}%')
print(f'Accuracy for criterion 4: {100 * correct4 / total}%')

Accuracy for criterion 1: 100.0%
Accuracy for criterion 2: 100.0%
Accuracy for criterion 3: 100.0%
Accuracy for criterion 4: 100.0%


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
True
