In [None]:
import shutil
shutil.rmtree('/root/fiftyone')

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
%cd drive/MyDrive

/content/drive/MyDrive


In [None]:
!pip install fiftyone
!pip install tensorflow torch torchvision umap-learn
!pip install 'ipywidgets>=8,<9'"]}

import fiftyone as fo
import fiftyone.zoo as foz
import fiftyone.utils.openimages as fouo

In [3]:
import torch, torchvision
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision.transforms.functional as transform
import numpy as np
import os
import math
import random
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import f1_score, jaccard_score
from torchvision import models
from torch.nn.functional import relu, softmax
from datetime import datetime
from PIL import Image
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, datasets
from matplotlib.colors import ListedColormap

In [4]:
def seconds_to_time(seconds):
    s = int(seconds) % 60
    m = int(seconds) // 60
    if m < 1:
        return f'{s}s'
    h = m // 60
    m = m % 60
    if h < 1:
        return f'{m}m{s}s'
    return f'{h}h{m}m{s}s'

In [5]:
class_list = ["Lemon", "Orange", "Strawberry"]

In [None]:
# Duomenų parsisiuntimas
def download_segmentation_dataset(dataset_name, split, classes, max_samples):
    for dc in classes:
        print(dc)
        custom_dataset_name = f"{dataset_name}_{split}_{max_samples}"

        dataset = foz.load_zoo_dataset(
            dataset_name,
            split=split,
            classes=[dc],
            num_workers=4,
            label_types=["segmentations"],
            max_samples=max_samples,
            dataset_name=custom_dataset_name
        )

        export_dir = f"/content/drive/MyDrive/Colab Notebooks/Lab3/Images/{split}/{dc}"

        dataset.export(
            export_dir=export_dir,
            dataset_type=fo.types.ImageSegmentationDirectory,
            label_field="ground_truth"
        )

download_segmentation_dataset("open-images-v7", "train", class_list, 600)
download_segmentation_dataset("open-images-v7", "test", class_list, 150)

In [6]:
class CustomDataset(torch.utils.data.Dataset):
    def __init__(self, root_dir, data_types, class_list, is_train=False):
        self.is_train = is_train
        self.img_list = []
        self.mask_list = []
        self.classes = {}

        # Klasėms priskiriame indeksus
        for i, class_name in enumerate(class_list):
          self.classes[class_name] = i + 1

        # Gaunami paveiksliukai ir jų kaukės
        for data_type in data_types:
            for class_name in class_list:
                img_directory = os.path.join(root_dir, f"{data_type}/{class_name}/data")
                mask_directory = os.path.join(root_dir, f"{data_type}/{class_name}/labels")

                # Jei paveikslėlio formatas .jpg, įrašomas jo kelias ir klasės indeksas į img_list. (su mask tas pats)
                for filename in os.listdir(img_directory):
                    if filename.endswith('.jpg'):
                        self.img_list.append((os.path.join(img_directory, filename), self.classes[class_name]))
                        mask_name = os.path.splitext(filename)[0] + '.png'
                        self.mask_list.append(os.path.join(mask_directory, mask_name))

    # Grąžina paveikslėlių kiekį
    def __len__(self):
        return len(self.img_list)

    # Transformacijos
    def transform(self, img, mask, class_idx):
        resize_transform = transforms.Resize((256, 256))
        to_tensor_transform = transforms.ToTensor()
        normalize_transform = transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
        color_jitter_transform = transforms.ColorJitter(brightness=0.3, contrast=0.2, saturation=0.2, hue=0.1)

        img = resize_transform(img)
        mask = resize_transform(mask)

        if self.is_train:
            # Horizontalus pasukimas
            if random.random() > 0.5:
                img = transform.hflip(img)
                mask = transform.hflip(mask)

            # Vertikalus pasukimas
            if random.random() > 0.5:
                img = transform.vflip(img)
                mask = transform.vflip(mask)

            # Spalvų transformacijos
            if random.random() > 0.5:
                img = color_jitter_transform(img)

        img = to_tensor_transform(img)
        img = normalize_transform(img)
        mask = to_tensor_transform(mask).squeeze(0)

        # Indeksuojami mask pikseliai 0 - fonas, kitu atveju klasė, pagal class_index
        mask = torch.where(mask > 0, torch.tensor(class_idx), mask)
        mask = mask.long()

        return img, mask

    def __getitem__(self, idx):
        img_name, class_idx = self.img_list[idx]
        mask_name = self.mask_list[idx]
        img = Image.open(img_name).convert('RGB')
        mask = Image.open(mask_name).convert('L')

        transformed_img, transformed_mask = self.transform(img, mask, class_idx)

        return img_name, transformed_img, transformed_mask

In [7]:
# Užkraunami duomenys
root_dir = "/content/drive/MyDrive/Colab Notebooks/Lab3/Images"
train_dataset = CustomDataset(root_dir, ["train"], class_list, is_train = True)
test_dataset = CustomDataset(root_dir, ["test"], class_list, is_train = False)

num_workers = 2
batch_size = 16

train_loader = DataLoader(train_dataset, batch_size = batch_size, num_workers = num_workers, shuffle = True)
test_loader = DataLoader(test_dataset, batch_size = batch_size, num_workers = num_workers, shuffle = False)

print(f'Train: {len(train_dataset)}, Test: {len(test_dataset)}')

Train: 1766, Test: 434


In [None]:
# Išspausdina suindeksuotą mask
_, _, mask = train_dataset[1300]
mask_np = mask.numpy()
np.set_printoptions(threshold=np.inf, linewidth=np.inf)
print(mask_np)

In [8]:
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
print(f'Device: {device}')

Device: cuda:0


In [9]:
# UNet modelis
class DoubleConv(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(DoubleConv, self).__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, 3, 1, 1, bias=False),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels, 3, 1, 1, bias=False),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
        )

    def forward(self, x):
        return self.conv(x)

class UNet(nn.Module):
    def __init__(
            self, in_channels=3, out_channels=4, features=[64, 128, 256, 512],
    ):
        super(UNet, self).__init__()
        self.ups = nn.ModuleList()
        self.downs = nn.ModuleList()
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)

        # Down part of UNET
        for feature in features:
            self.downs.append(DoubleConv(in_channels, feature))
            in_channels = feature

        # Up part of UNET
        for feature in reversed(features):
            self.ups.append(
                nn.ConvTranspose2d(
                    feature*2, feature, kernel_size=2, stride=2,
                )
            )
            self.ups.append(DoubleConv(feature*2, feature))

        self.bottleneck = DoubleConv(features[-1], features[-1]*2)
        self.final_conv = nn.Conv2d(features[0], out_channels, kernel_size=1)

    def forward(self, x):
        skip_connections = []

        for down in self.downs:
            x = down(x)
            skip_connections.append(x)
            x = self.pool(x)

        x = self.bottleneck(x)
        skip_connections = skip_connections[::-1]

        for idx in range(0, len(self.ups), 2):
            x = self.ups[idx](x)
            skip_connection = skip_connections[idx//2]

            if x.shape != skip_connection.shape:
                x = transform.resize(x, size=skip_connection.shape[2:])

            concat_skip = torch.cat((skip_connection, x), dim=1)
            x = self.ups[idx+1](concat_skip)

        x = self.final_conv(x)
        x = torch.softmax(x, dim=1)

        return x

In [11]:
# Testavimo funkcija
def train_epoch(model, loader, optimizer, loss_func, n_classes):
    model.train()

    total_loss = 0

    for image_paths, images, masks in loader:
        images = images.to(device)
        masks = masks.to(device)

        # Išvalo senus gradientus prieš atnaujinant svorius
        optimizer.zero_grad()
        predictions = model(images)
        loss = loss_func(predictions, masks)
        # Skaičiuoja gradientus, naudojamus modelio svorių atnaujinimui
        loss.backward()
        # Atnaujina modelio svorius, naudodamas paskaičiuotus gradientus
        optimizer.step()

        total_loss += loss.item()

        visualize_image_and_masks(image_paths[0], images[0], masks[0], predictions, n_classes)

    average_loss = total_loss / len(loader)

    return average_loss

In [12]:
# Skaičiuojamas Dice score
def dice_score(true_mask, predicted_mask):
    TP = (true_mask == True) & (predicted_mask == True)
    TP_count = np.sum(TP)

    FP = (predicted_mask == True) & (true_mask == False)
    FP_count = np.sum(FP)

    FN = (true_mask == True) & (predicted_mask == False)
    FN_count = np.sum(FN)

    dice = 2 * TP_count / (2 * TP_count + FP_count + FN_count) if (2 * TP_count + FP_count + FN_count) > 0 else 0.0

    return dice

In [13]:
# Skaičiuojamas IoU (Intersection over union)
def iou_score(true_mask, predicted_mask):
    # Apskaičiuojame sankirtą tarp kaukių
    intersection = np.logical_and(true_mask, predicted_mask).sum()

    # Apskaičiuojame sąjungą
    union = np.logical_or(true_mask, predicted_mask).sum()

    # Apskaičiuojame IoU
    iou = intersection / union if union != 0 else 0

    return iou

In [14]:
def evaluate_model(model, loader, n_classes):
    model.eval()

    total_correct = 0
    total_pixels = 0
    dice_scores = []
    ious = []
    f1_micros = []
    f1_macros = []

    with torch.no_grad():
        for image_paths, images, true_masks in loader:
            images, true_masks = images.to(device), true_masks.to(device)
            predictions = model(images)
            _, predicted_masks = torch.max(predictions, 1)

            total_correct += (predicted_masks == true_masks).sum().item()
            total_pixels += true_masks.nelement()

            true_masks_np = true_masks.cpu().numpy().flatten()
            predicted_masks_np = predicted_masks.cpu().numpy().flatten()

            valid_labels = range(1, n_classes)
            mask_indices = np.isin(true_masks_np, valid_labels)

            # Išfiltruoja klasių indeksus, neįtraukiant fono
            valid_true_masks_np = true_masks_np[mask_indices]
            valid_predicted_masks_np = predicted_masks_np[mask_indices]

            for label in valid_labels:
                # Tikrina, kurie pikseliai atitinka label indeksą
                label_true_masks = (valid_true_masks_np == label)
                label_predicted_masks = (valid_predicted_masks_np == label)

                # Įvertina, kiek gerai modelio spėta kaukė sutampa su tikrąja kauke
                dice_scores.append(dice_score(label_true_masks, label_predicted_masks))
                ious.append(iou_score(label_true_masks, label_predicted_masks))

            f1_micros.append(f1_score(valid_true_masks_np, valid_predicted_masks_np, average='micro', zero_division=0))
            f1_macros.append(f1_score(valid_true_masks_np, valid_predicted_masks_np, average='macro', zero_division=0))

    mean_iou = sum(ious) / len(ious)
    mean_dice_score = sum(dice_scores) / len(dice_scores)
    mean_f1_micro = sum(f1_micros) / len(f1_micros)
    mean_f1_macro = sum(f1_macros) / len(f1_macros)
    pixel_accuracy = total_correct / total_pixels

    metrics = {
        'mean_iou': mean_iou * 100,
        'accuracy': pixel_accuracy * 100,
        'mean_dice_score': mean_dice_score * 100,
        'f1_micro': mean_f1_micro * 100,
        'f1_macro': mean_f1_macro * 100
    }

    return metrics

In [15]:
def train_and_eval(model, loader_train, loader_test, epoch_count, lr, n_classes):
    optimizer = torch.optim.Adam(model.parameters(), lr = lr)
    loss_func = torch.nn.CrossEntropyLoss().to(device)

    start_time = datetime.now()

    for epoch in range(epoch_count):
        avg_train_loss = train_epoch(model, loader_train, optimizer, loss_func, n_classes)

        train_metrics = evaluate_model(model, loader_train, n_classes)
        test_metrics = evaluate_model(model, loader_test, n_classes)

        current_time = datetime.now()
        elapsed_time = (current_time - start_time).total_seconds()

        print(f'Epoch: {epoch}, Time Elapsed: {seconds_to_time(elapsed_time)}s, Training Loss: {avg_train_loss:.2f}')
        print(f"    Training Metrics: IoU: {train_metrics['mean_iou']:.2f}%, "
              f"Accuracy: {train_metrics['accuracy']:.2f}%, "
              f"Dice: {train_metrics['mean_dice_score']:.2f}%, "
              f"Micro-F1: {train_metrics['f1_micro']:.2f}%, "
              f"Macro-F1: {train_metrics['f1_macro']:.2f}% "
            )
        print(f"    Testing Metrics: IoU: {test_metrics['mean_iou']:.2f}%, "
              f"Accuracy: {test_metrics['accuracy']:.2f}%, "
              f"Dice: {test_metrics['mean_dice_score']:.2f}%, "
              f"Micro-F1: {test_metrics['f1_micro']:.2f}%, "
              f"Macro-F1: {test_metrics['f1_macro']:.2f}% "
            )

        if epoch % 2 == 0:
          torch.save(model.state_dict(), saved_model_path)
          print(f"=> Epoch saved")

In [16]:
# Paveikslėlių ir kaukių vizualizavimas
def visualize_image_and_masks(image_path, image, true_mask, predictions, n_classes):
    print(f"Image Path: {image_path}")
    pred_mask = torch.argmax(predictions, dim=1).cpu().numpy()

    colors = ['black', 'yellow', 'orange', 'red']
    cmap = ListedColormap(colors[:n_classes])

    plt.figure(figsize=(15, 5))

    # Originalus paveiksliukas
    plt.subplot(1, 3, 1)
    plt.imshow(image.permute(1, 2, 0).cpu().numpy())
    plt.title('Original Image')
    plt.axis('off')

    # Teisinga kaukė
    plt.subplot(1, 3, 2)
    plt.imshow(true_mask.cpu().numpy(), cmap=cmap, interpolation='nearest', vmin=0, vmax=n_classes - 1)
    plt.title('True Mask')
    plt.axis('off')

    # Spėta kaukė
    plt.subplot(1, 3, 3)
    plt.imshow(pred_mask[0], cmap=cmap, interpolation='nearest', vmin=0, vmax=n_classes - 1)
    plt.title('Predicted Mask')
    plt.axis('off')

    plt.show()

In [17]:
n_classes = 4
model = UNet(train_dataset[0][1].shape[0], n_classes).to(device)
print(f'Parameter count: {sum(p.numel() for p in model.parameters() if p.requires_grad):,}')

saved_model_path = '/content/drive/MyDrive/Colab Notebooks/Lab3/saved_model.pth'
model.load_state_dict(torch.load(saved_model_path, map_location=torch.device(device)))

train_and_eval(model, train_loader, test_loader, epoch_count=1, lr=1e-3, n_classes=n_classes)

Output hidden; open in https://colab.research.google.com to view.