In [None]:
pip install openimages

In [None]:
pip install torch torchvision

In [19]:
import os
import glob
import PIL
import matplotlib.pyplot as plt
import numpy as np

import torch
import torchvision
import torchvision.transforms as transforms
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

from torch.utils.data import DataLoader
from torch.utils.data import Dataset
from openimages.download import download_dataset
from PIL import Image

from sklearn.metrics import classification_report, confusion_matrix, accuracy_score, precision_score, recall_score, f1_score
import seaborn as sns

Defining transformations

In [20]:
transformTrain = transforms.Compose([
  transforms.RandomHorizontalFlip(),
  transforms.Resize((128, 128)),
  transforms.Pad(2),
  transforms.RandomCrop((128,128)),
  transforms.RandomRotation(10),
  transforms.ToTensor()
])
transformTest = transforms.Compose([
    transforms.Resize((128, 128)),
    transforms.ToTensor(), ])

Loading datasets

In [21]:
data_dir = "data"
number_for_samples = 500
classes = ["Orange", "Umbrella", "Strawberry"]


if not os.path.exists(data_dir):
    os.makedirs(data_dir)
    download_dataset(data_dir, classes, limit=number_for_samples)
else:
  print("Dataset already exists")

Dataset already exists


We create a file object with all of the files loaded according to class

In [22]:
images_dir = "./data"
files = { c : [] for c in classes}
for c in classes:
  files[c] = glob.glob(images_dir + "/{}/images/*.jpg".format(c.lower()))

Slicing files array to train files and test files

In [23]:
trainFiles = {c : [] for c in classes}
testFiles = {c : [] for c in classes}
for c in classes:
    trainFiles[c] = files[c][:int(len(files[c]) * 0.8)]
    testFiles[c] = files[c][-int(len(files[c]) * 0.2):]

In [24]:
class CustomDataset(Dataset):
    def __init__(self, files, transform):
        self.files = files
        self.transform = transform

        # get each class lengths
        self.length = {c : [] for c in classes}
        for c in classes:
          self.length[c] = len(files[c])

        self.all_files = [item for sublist in files.values() for item in sublist] # flattens list

        # set labels
        self.labels = np.zeros(len(self.all_files))
        self.labels[self.length[classes[0]]:self.length[classes[0]]+self.length[classes[1]]] = 1
        self.labels[self.length[classes[0]]+self.length[classes[1]]:] = 2

        # shuffle data
        self.order =  [x for x in np.random.permutation(len(self.labels))]
        self.all_files = [self.all_files[x] for x in self.order]
        self.labels = [self.labels[x] for x in self.order]


    def __len__(self):
        return (len(self.all_files))

    def __getitem__(self, i):
        file = self.all_files[i]

        image = Image.open(file).convert('RGB')
        img = self.transform(image)

        label = self.labels[i]
        return (img, label)

Creating test and train datasets, dataloaders

In [25]:
train_dataset = CustomDataset(trainFiles, transformTrain)
test_dataset = CustomDataset(testFiles, transformTest)

num_workers = 2
batch_size = 8
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=num_workers)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=num_workers)

In [None]:
img = train_dataset[0][0].numpy()
plt.imshow(img.transpose(1,2,0))

In [None]:
class CNN(nn.Module):
    def __init__(self, classes_count):
        super(CNN, self).__init__()
        # classes_count: Number of input channels, 3 for RGB (1R, 1G, 1B), 1 for grayscale
        
        # 16: Number of output channels, feature maps for images (128 x128)
        # The output channels are unique filters that are applied to the input image. Each filter detects different features in the image. 
        # The more output channels you have, the more features the network can potentially learn and recognize
        
        # 5: Size of the convolution kernel, 5x5
        # Smaller kernels (e.g., 3x3) are often used because they have fewer parameters and are computationally more efficient
        # Larger kernels can capture more spatial hierarchies in the input data but at the expense of increased computational cost
        self.conv1 = nn.Conv2d(3, 16, 5)
        # now the input channel is 16, which is the number of output channels from the previous layer
        self.conv2 = nn.Conv2d(16, 32, 5)

	    # Max pooling over a (2, 2) window, stride is 2, because of maxpool dimensions
        # Max pooling is a downsampling operation that reduces the dimensions (width & height) of the input while retaining the most important information
        self.pool = nn.MaxPool2d(2, 2)

        # fully connected layers
        # 32 * 29 * 29: The size of the input to the fully connected layer is the size of the output from convs and maxpools
        # we get dimensions 29x29 after conv1, maxpool, conv2, maxpool
        # after conv1: 124 = 128-5+1
        # after maxpool: 62 = (124-2) /2 +1
        # after conv2: 58 = 62-5+1
        # after maxpool: 29 = (58-2) /2 +1
        # 32 i think might just be the output from conv2
        # 120: number of neurons in the fully connected layer (this is the output)
        self.fc1 = nn.Linear(32 * 29 * 29, 120)
        # 120: Number of input features
        # 84: Number of neurons in the fully connected layer
        self.fc2 = nn.Linear(120, 84)
        # The purpose is to perform the final classification and produce the output prediction probabilities for each class
        # the nr of outputs, neurons the final layer will have is the same as the categories we have (orange,....)
        self.fc3 = nn.Linear(84, classes_count)

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x))) # relu for non-linearity
        x = self.pool(F.relu(self.conv2(x)))
        x = x.view(-1, 32 * 29 * 29) # Flatten the output of conv2 to 1 dimensional vector to be fed into the fully connected layers
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x

num_classes = 3
myNet = CNN(num_classes)
print(myNet)

Defining helper methods

In [30]:
def set_to_device(data, device):
    return (d.to(device) for d in data)

In [29]:
def get_acc(outputs, labels):
    with torch.no_grad():
        outputLabels = torch.argmax(outputs, 1)
        return torch.sum((labels == outputLabels).float())
    
# outputs returns a list of predictions for an image to belong to a class, so an array of 3 elements
# outputsLabels picks the index of the highest prediction (which class it belongs to)
# outputs contains 8 arrays, because we load data in batches of 8 (I set that above)

# outputs  tensor([[-0.8575,  1.3114, -1.0110],
#         [-4.8837,  0.3225,  2.8092],
#         [-0.8008,  0.6337, -0.3348],
#         [ 1.7059,  1.5647, -3.2893],
#         [-1.0364,  2.8903, -2.9380],
#         [-1.0849,  2.4910, -2.3542],
#         [-2.2653,  4.8638, -4.3033],
#         [ 2.0741,  2.0949, -4.8366]], device='cuda:0')
# outputLabels tensor([1, 2, 1, 0, 1, 1, 1, 1], device='cuda:0')

# softmax pavercia raw values i predictions/probability, tai del accuracy calculations, nereik nei softmax nei sigmoid
# mes nenaudojam softmax per CNN forward() nes nn.CrossEntropyLoss() acceptina raw values
# ir internally applyina softmax, o mes gaunam accuracy po CrossEntropyLoss

In [33]:
DEVICE = 'cuda'

N_train = len(train_dataset)
N_test = len(test_dataset)


def train(network, n_epochs, lr):
  network.to(DEVICE)

  optimizer = torch.optim.Adam(network.parameters(), lr=lr) # Initializes the optimizer to update model parameters during training
  criterion = nn.CrossEntropyLoss() # for computing loss

  for epoch in range(0, n_epochs):

      train_loss = 0.0
      train_acc = 0.0
      test_loss = 0.0
      test_acc = 0.0

      for i, data in enumerate(train_loader, 0):
          images, labels = set_to_device(data, DEVICE)

          optimizer.zero_grad() # clear gradients from previous cycle
          outputs = myNet(images) # When you pass an argument to myNet, you are essentially passing an input tensor through the neural network to get the output predictions

          loss = criterion(outputs, labels.long()) # calculates loss

          train_loss += loss.item() * images.size(0) # images.size(0) returns batch size
          train_acc += get_acc(outputs, labels)

          loss.backward() # calculates the gradients for each parameter in the model. 
          # These gradients will be used by the optimizer to update the model weights during the optimization step.
          optimizer.step() # updates the model parameters using the computed gradients

      for data in test_loader:
          with torch.no_grad():
            images, labels = set_to_device(data, DEVICE)
            outputs = myNet(images)

            loss = criterion(outputs, labels.long())

            test_loss += loss.item() * images.size(0)
            test_acc += get_acc(outputs, labels)

            # In testing (or validation), you don't need to compute gradients or update the model parameters. 


      print('Epoch: {} | Train Loss: {:.6f} | Train Acc: {:.3f} | Test Loss: {:.6f} | Test Acc: {:.3f}'.format(
            epoch,
            train_loss / N_train,
            train_acc / N_train,
            test_loss / N_test,
            test_acc / N_test,
            ))

  torch.cuda.synchronize() # ensures that all the operations on the CUDA device have been completed before moving on to the next operations on the CPU

  print('Finished Training')
  torch.save(myNet.state_dict(), 'trained_model_parameters.pth')

In [None]:
train(myNet, 30, 1e-3)
train(myNet, 20, 1e-4)
train(myNet, 10, 1e-5)

torch.save(myNet.state_dict(), 'trained_model_parameters.pth')

# statistics


In [None]:
true_values = []
predicted_values = []

with torch.no_grad():
    for data in test_loader:
        images, labels = set_to_device(data, DEVICE)
        outputs = myNet(images)
        _, predicted = torch.max(outputs.data, 1)
        true_values.extend(labels.cpu().numpy())
        predicted_values.extend(predicted.cpu().numpy())

accuracy = accuracy_score(true_values, predicted_values)
print(f"Accuracy: {accuracy:.5f}")

recall = recall_score(true_values, predicted_values, average='weighted')
print(f"Recall: {recall:.5f}")

precision = precision_score(true_values, predicted_values, average='weighted')
print(f"Precision: {precision:.5f}")

f1 = f1_score(true_values, predicted_values, average='weighted')
print(f"F1 Score: {f1:.5f}")

In [74]:
def printConfusionMatrix(conf_matrix, classes):
  plt.figure(figsize=(8, 6))
  sns.heatmap(conf_matrix, annot=True, fmt="d", cmap="Reds", cbar=False, xticklabels=classes, yticklabels=classes)
  plt.xlabel("Predicted Labels")
  plt.ylabel("True Labels")
  plt.title("Confusion Matrix")
  plt.show()

In [None]:
conf_matrix = confusion_matrix(true_values, predicted_values)
printConfusionMatrix(conf_matrix, classes)

# UI

In [None]:
from google.colab.output import eval_js
print(eval_js("google.colab.kernel.proxyPort(5000)"))

In [None]:
from flask import Flask, render_template, request
app = Flask(__name__, template_folder="./")

@app.route("/")
def home():
  return render_template("index.html")

@app.route('/calculate_scores', methods=['POST'])
def calculate_scores():
    if 'image' not in request.files:
        return "No image uploaded", 400

    image_file = request.files['image']
    image = Image.open(image_file.stream)

    input_tensor = transformTest(image).unsqueeze(0) # prepares it as a tensor suitable for the neural network model

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    input_tensor = input_tensor.to(device)

    with torch.no_grad():
      predictions = myNet(input_tensor)
      _, predicted_class = predictions.max(1) # finds the index of the highest score, which corresponds to the predicted class.
      predicted_label = classes[predicted_class]

    return "Predicted class: " + predicted_label

if __name__ == '__main__':
    app.run()