### Necessary Libraries and Frameworks

In [8]:
!pip install optuna
!pip install scikit-learn
!pip install segmentation-models-pytorch timm
import os
import optuna
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from PIL import Image
from sklearn.model_selection import train_test_split
import torchvision.transforms as transforms
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
import numpy as np
import matplotlib.pyplot as plt

Defaulting to user installation because normal site-packages is not writeable

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip available: [0m[31;49m22.2.2[0m[39;49m -> [0m[32;49m25.0.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
Defaulting to user installation because normal site-packages is not writeable

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip available: [0m[31;49m22.2.2[0m[39;49m -> [0m[32;49m25.0.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
Defaulting to user installation because normal site-packages is not writeable

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip available: [0m[31;49m22.2.2[0m[39;49m -> [0m[32;49m25.0.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


In [9]:
import segmentation_models_pytorch as smp

In [10]:
NUM_EPOCHS = 5
IMAGE_SIZE = (384, 512)
MEAN = [0.7059, 0.7097, 0.7028]
STD = [0.229, 0.224, 0.225]
THRESHOLD = 0.6

### Directories

In [11]:
IMAGE_DIR = 'Mikroplastikai/images'
MASK_DIR = 'Mikroplastikai/masks'
MODEL_PATH = 'microplastic_segmentation_model.pth'

### Microplastics Dataset

In [12]:
class MicroplasticsDataset(Dataset):
    def __init__(self, image_paths, mask_paths, image_transforms=None, mask_transforms=None):
        self.image_paths = image_paths
        self.mask_paths = mask_paths
        self.image_transforms = image_transforms
        self.mask_transforms = mask_transforms

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        image = Image.open(self.image_paths[idx]).convert('RGB')
        mask = Image.open(self.mask_paths[idx])

        if self.image_transforms:
            image = self.image_transforms(image)

        if self.mask_transforms:
            mask = self.mask_transforms(mask)

        return image, mask

### Train and test datasets and data loader

In [13]:
def get_data_loaders_and_datasets(train_images, test_images, train_masks, test_masks, batch_size):
    # Data transformations
    image_transform = transforms.Compose([
        transforms.Resize(IMAGE_SIZE),
        transforms.ToTensor(),
        transforms.Normalize(mean=MEAN, std=STD)
    ])

    mask_transform = transforms.Compose([
        transforms.Resize(IMAGE_SIZE),
        transforms.ToTensor()
    ])

    # datasets
    train_dataset = MicroplasticsDataset(train_images, train_masks, image_transforms=image_transform, mask_transforms=mask_transform)
    test_dataset = MicroplasticsDataset(test_images, test_masks, image_transforms=image_transform, mask_transforms=mask_transform)

    # loaders
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

    return train_loader, test_loader, train_dataset, test_dataset


### Optuna objective function

In [14]:
def objective(trial):
    batch_size = trial.suggest_categorical('batch_size', [4, 6, 8, 12, 16])
    learning_rate = trial.suggest_float('learning_rate', 1e-5, 1e-2, log=True)
    optimizer_name = trial.suggest_categorical('optimizer', ['Adam', 'RMSprop', 'SGD'])

    # Data loaders
    train_loader, test_loader, _, _ = get_data_loaders_and_datasets(train_images, test_images, train_masks, test_masks, batch_size)

    # Model
    model = smp.Unet(encoder_name='resnet101', encoder_weights='imagenet', in_channels=3, classes=1).to(device)

    # Optimizer
    if optimizer_name == 'Adam':
        optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    elif optimizer_name == 'RMSprop':
        optimizer = optim.RMSprop(model.parameters(), lr=learning_rate)
    else:
        optimizer = optim.SGD(model.parameters(), lr=learning_rate, momentum=0.9)

    criterion = smp.losses.DiceLoss(
    mode='binary',
    from_logits=True
    )

    # Training loop
    model.train()
    for epoch in range(NUM_EPOCHS):
        for images, masks in train_loader:
            images, masks = images.to(device), masks.to(device)
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, masks)
            loss.backward()
            optimizer.step()

    # Evaluation phase
    model.eval()
    total_loss = 0
    with torch.no_grad():
        for images, masks in test_loader:
            images, masks = images.to(device), masks.to(device)
            outputs = model(images)
            loss = criterion(outputs, masks)
            total_loss += loss.item()

    return total_loss / len(test_loader)


### Hyperparameter fine-tuning

In [15]:
# CHANGED: Pair images and masks by basename for consistency
!pip install plotly

img_files = sorted(f for f in os.listdir(IMAGE_DIR) if os.path.isfile(os.path.join(IMAGE_DIR, f)))
mask_files = sorted(f for f in os.listdir(MASK_DIR) if os.path.isfile(os.path.join(MASK_DIR, f)))
img_map = {os.path.splitext(f)[0]: f for f in img_files}
mask_map = {os.path.splitext(f)[0]: f for f in mask_files}
common = sorted(set(img_map) & set(mask_map))
images = [os.path.join(IMAGE_DIR, img_map[k]) for k in common]
masks = [os.path.join(MASK_DIR, mask_map[k]) for k in common]
train_images, test_images, train_masks, test_masks = train_test_split(images, masks, test_size=0.2, random_state=42)

# Device configuration
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Create a study object and perform optimization
study = optuna.create_study(direction='minimize')
study.optimize(objective, n_trials=15)

print("Best hyperparameters:", study.best_trial.params)

# Plot the optimization history
fig = optuna.visualization.plot_optimization_history(study)
fig.show()

# Plot individual hyperparameters' effects on the outcomes
fig = optuna.visualization.plot_slice(study)
fig.show()

# Plot the contour of hyperparameters vs. objective value
fig = optuna.visualization.plot_contour(study, params=['batch_size', 'learning_rate', 'optimizer'])
fig.show()

# Parallel coordinate plot of the hyperparameters
fig = optuna.visualization.plot_parallel_coordinate(study)
fig.show()

Defaulting to user installation because normal site-packages is not writeable

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip available: [0m[31;49m22.2.2[0m[39;49m -> [0m[32;49m25.0.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


[I 2025-05-15 14:23:50,488] A new study created in memory with name: no-name-8fd36d7d-76a8-4544-9a77-8086689e3c41
[I 2025-05-15 14:40:12,805] Trial 0 finished with value: 0.991955123164437 and parameters: {'batch_size': 8, 'learning_rate': 0.00016005752156053759, 'optimizer': 'SGD'}. Best is trial 0 with value: 0.991955123164437.
[I 2025-05-15 14:58:36,779] Trial 1 finished with value: 0.2717207372188568 and parameters: {'batch_size': 6, 'learning_rate': 0.004345398546665549, 'optimizer': 'Adam'}. Best is trial 1 with value: 0.2717207372188568.
[W 2025-05-15 15:02:57,683] Trial 2 failed with parameters: {'batch_size': 6, 'learning_rate': 0.009475305219368756, 'optimizer': 'Adam'} because of the following error: KeyboardInterrupt().
Traceback (most recent call last):
  File "/scratch/lustre/home/lupu8256/.local/lib/python3.8/site-packages/optuna/study/_optimize.py", line 197, in _run_trial
    value_or_values = func(trial)
  File "/tmp/slurm-lupu8256-173106/ipykernel_90271/693595577.py"

KeyboardInterrupt: 

### Training the Model

In [None]:
# Train the model with the best hyperparameters
best_params = study.best_trial.params
model = smp.Unet(encoder_name='resnet101', encoder_weights='imagenet', in_channels=3, classes=1).to(device)

if best_params['optimizer'] == 'Adam':
    optimizer = optim.Adam(model.parameters(), lr=best_params['learning_rate'])
elif best_params['optimizer'] == 'RMSprop':
    optimizer = optim.RMSprop(model.parameters(), lr=best_params['learning_rate'])
else:
    optimizer = optim.SGD(model.parameters(), lr=best_params['learning_rate'], momentum=0.9)

criterion = nn.BCEWithLogitsLoss()
train_loader, test_loader, train_dataset, test_dataset = get_data_loaders_and_datasets(
    train_images, test_images, train_masks, test_masks, best_params['batch_size'])

for epoch in range(NUM_EPOCHS):
    for images, masks in train_loader:
        images, masks = images.to(device), masks.to(device)
        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, masks)
        loss.backward()
        optimizer.step()

torch.save(model.state_dict(), MODEL_PATH)
model.load_state_dict(torch.load(MODEL_PATH))

epochs = range(0, len(epoch_accuracy))

plot_metrics(epochs, epoch_accuracy, epoch_precision, epoch_recall, epoch_f1, epoch_iou, epoch_losses)

print(f'Epoch accuracies: {epoch_accuracy}')
print(f'Epoch precisions: {epoch_precision}')
print(f'Epoch recalls: {epoch_recall}')
print(f'Epoch F1 Scores: {epoch_f1}')
print(f'Epoch IoUs: {epoch_iou}')
print(f'Epoch losses: {epoch_losses}')


In [None]:
print(accuracy_list)
print(precision_list)
print(recall_list)
print(f1_list)
print(iou_list)

### Evaluation and Visualization

In [None]:
# Initialize lists to store metrics
accuracy_list = []
precision_list = []
recall_list = []
f1_list = []
iou_list = []

# Evaluate the model and visualize the results
model.eval()
with torch.no_grad():
    image_counter = 0

    for i, (images, masks) in enumerate(test_loader):
        images, masks = images.to(device), masks.to(device)
        outputs = model(images)

        for j in range(images.size(0)):
            image_counter += 1

            # Apply sigmoid to get probabilities and then threshold to get binary output
            predicted_probs = torch.sigmoid(outputs[j])
            predicted_mask = (predicted_probs > THRESHOLD).float()

            # Convert the tensors to binary format for metric calculation
            mask_np = tensor_to_numpy_binary(masks[j])
            predicted_mask_np = tensor_to_numpy_binary(predicted_mask)

            # Flatten the arrays for metric calculations
            mask_np_flat = mask_np.flatten()
            predicted_mask_np_flat = predicted_mask_np.flatten()

            # Calculate metrics for each image and append to the lists
            accuracy_list.append(accuracy_score(mask_np_flat, predicted_mask_np_flat))
            precision_list.append(precision_score(mask_np_flat, predicted_mask_np_flat, zero_division=0))
            recall_list.append(recall_score(mask_np_flat, predicted_mask_np_flat, zero_division=0))
            f1_list.append(f1_score(mask_np_flat, predicted_mask_np_flat, zero_division=0))

            iou_score = calculate_iou(predicted_mask_np, mask_np)
            iou_list.append(iou_score)

            # Visualization - Display the first set of original, normalized, mask, and predicted mask images
            plt.figure(figsize=(15, 5))

            # Original Image (without normalization)
            original_image_path = test_dataset.image_paths[i * test_loader.batch_size + j]
            original_image = Image.open(original_image_path)
            plt.subplot(1, 3, 1)
            plt.imshow(original_image)
            plt.title(f'Orginali Nuotrauka {image_counter}')
            plt.axis('off')

            # Ground Truth Mask
            plt.subplot(1, 3, 2)
            plt.imshow(masks[j].cpu().squeeze(), cmap='gray')
            plt.title(f'Tikrasis Užmaskavimas {image_counter}')
            plt.axis('off')

            # Predicted Mask
            plt.subplot(1, 3, 3)
            plt.imshow(predicted_mask.cpu().squeeze(), cmap='gray')
            plt.title(f'Užmaskavimo Prognozė {image_counter}')
            plt.axis('off')

            plt.show()

# Calculate and print average metrics
avg_accuracy = np.mean(accuracy_list)
avg_precision = np.mean(precision_list)
avg_recall = np.mean(recall_list)
avg_f1 = np.mean(f1_list)
avg_iou = np.mean(iou_list)

print(f'Average Accuracy: {avg_accuracy:.4f}')
print(f'Average Precision: {avg_precision:.4f}')
print(f'Average Recall: {avg_recall:.4f}')
print(f'Average F1 Score: {avg_f1:.4f}')
print(f'Average IoU: {avg_iou:.4f}')


In [None]:
# save the model
torch.save(model.state_dict(), 'microplastic_segmentation_model.pth')

### Additional plots

In [None]:
plot_metrics(epochs, epoch_accuracy, None, None, None, None, None)

In [None]:
plot_metrics(epochs, None, None, None, None, None, epoch_losses)

In [None]:
# All metrics with 10 skipped epoch results
epochs_skipped = epochs[10:]
epoch_accuracy_skipped = epoch_accuracy[10:]
epoch_precision_skipped = epoch_precision[10:]
epoch_recall_skipped = epoch_recall[10:]
epoch_f1_skipped = epoch_f1[10:]
epoch_iou_skipped = epoch_iou[10:]
epoch_losses_skipped = epoch_losses[10:]

In [None]:
plot_metrics_skipped(epochs_skipped, epoch_accuracy_skipped, epoch_precision_skipped, epoch_recall_skipped, epoch_f1_skipped, epoch_iou_skipped, epoch_losses_skipped)

In [None]:
plot_metrics_skipped(epochs_skipped, epoch_accuracy_skipped, None, None, None, None, None)

In [None]:
plot_metrics_skipped(epochs_skipped, None, None, None, None, None, epoch_losses_skipped)

In [None]:
plot_metrics_skipped(epochs_skipped, None, epoch_precision_skipped, epoch_recall_skipped, epoch_f1_skipped, epoch_iou_skipped, None)

### Calcualte mean and standard deviation

In [None]:
class CustomImageDataset(Dataset):
    def __init__(self, image_dir, transform=None):
        self.image_paths = [os.path.join(image_dir, img) for img in os.listdir(image_dir) if img.endswith(('png', 'jpg', 'jpeg'))]
        self.transform = transform

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        image = Image.open(img_path).convert('RGB')
        if self.transform:
            image = self.transform(image)
        return image

image_transform = transforms.Compose([
    transforms.Resize(IMAGE_SIZE),
    transforms.ToTensor()
])

dataset = CustomImageDataset(
    image_dir=IMAGE_DIR,
    transform=image_transform
)

loader = DataLoader(dataset, batch_size=10, shuffle=False, num_workers=1)

# Function to calculate mean and std
def calculate_mean_std(loader):
    mean = 0.
    std = 0.
    total_images_count = 0

    for images in loader:
        batch_samples = images.size(0)
        images = images.view(batch_samples, images.size(1), -1)
        mean += images.mean(2).sum(0)
        std += images.std(2).sum(0)
        total_images_count += batch_samples

    mean /= total_images_count
    std /= total_images_count

    return mean, std

mean, std = calculate_mean_std(loader)
print(f'Mean: {mean}')
print(f'Std: {std}')

### Utility functions

In [None]:
def tensor_to_numpy_binary(tensor):
    return tensor.cpu().detach().numpy().astype(np.uint8)

In [None]:
def calculate_iou(predicted, target):
    intersection = np.logical_and(target, predicted)
    union = np.logical_or(target, predicted)
    if np.sum(union) == 0:
        iou_score = 0.0
    else:
        iou_score = np.sum(intersection) / np.sum(union)
    return iou_score

In [None]:
def plot_metrics(epochs, accuracy, precision, recall, f1, iou, losses):
    plt.figure(figsize=(12, 8))

    # Metrics
    if (accuracy != None):
      plt.plot(epochs, accuracy, label='Tikslumas')
    if (precision != None):
      plt.plot(epochs, precision, label='Preciziškumas')
    if (recall != None):
      plt.plot(epochs, recall, label='Atkūrimo statistika')
    if (f1 != None):
      plt.plot(epochs, f1, label='F1')
    if (iou != None):
      plt.plot(epochs, iou, label='IoU')
    if (losses != None):
      plt.plot(epochs, losses, label='Nuostolis')

    plt.xlabel('Epizodas')
    plt.ylabel('Metrika')
    plt.title('Metrikų rezultatai epochose')
    plt.legend(loc='upper left')
    plt.xticks(np.arange(0, last_epoch + 1, 5))
    plt.grid(True)

    plt.tight_layout()
    plt.show()

In [None]:
def plot_metrics_skipped(epochs, accuracy, precision, recall, f1, iou, losses):
    plt.figure(figsize=(12, 8))

    if accuracy is not None:
        plt.plot(epochs, accuracy, label='Tikslumas')
    if precision is not None:
        plt.plot(epochs, precision, label='Preciziškumas')
    if recall is not None:
        plt.plot(epochs, recall, label='Atkūrimo statistika')
    if f1 is not None:
        plt.plot(epochs, f1, label='F1')
    if iou is not None:
        plt.plot(epochs, iou, label='IoU')
    if losses is not None:
        plt.plot(epochs, losses, label='Nuostolis')

    plt.xlabel('Epizodas')
    plt.ylabel('Metrika')
    plt.title('Metrikų rezultatai epochose')
    plt.legend(loc='upper left')
    plt.xticks(np.arange(10, last_epoch + 1, 5))
    plt.grid(True)

    plt.tight_layout()
    plt.show()