<a href="https://colab.research.google.com/github/hiebschi/MoSE_scripts/blob/main/loop_v01.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>


# Implementing a Segmentation Model

## 1. Preparations
### 1.1 Install required packages

In [3]:
import json
import os
import zipfile
import numpy as np
import geopandas as gpd
import sklearn
import torch
import matplotlib.pyplot as plt
import numpy as np
import torchvision

In [4]:
print(f"PyTorch version: {torch.__version__}\ntorchvision version: {torchvision.__version__}")

PyTorch version: 2.5.1+cu121
torchvision version: 0.20.1+cu121


### 1.2 Install segmentation model

In [5]:
!pip install segmentation-models-pytorch



In [6]:
import segmentation_models_pytorch as smp

### 1.3 Device agnostic code

In [7]:
device = "cuda" if torch.cuda.is_available() else "cpu"
device

'cpu'

### 1.4 Upload data to colab

In [5]:
# manual upload (for small files)
# from google.colab import files
# uploaded = files.upload()

In [8]:
# ACCESS TO GOOGLE DRIVE
################################################################################
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [9]:
################################################################################
# Data directories on GOOGLE DRIVE  # -------------------->> ADJUSTABLE
################################################################################
# npy preprocessed patches
preprocessed_patches_dir = '/content/drive/My Drive/Dokumente.GD/FS06 SS24/BACHELORARBEIT/MoSE/data/preprocessed_patches'
# JSON class codes
codes_dir = '/content/drive/My Drive/Dokumente.GD/FS06 SS24/BACHELORARBEIT/MoSE/data/classes'
# Shapefile class labels
labels_dir = '/content/drive/My Drive/Dokumente.GD/FS06 SS24/BACHELORARBEIT/MoSE/data/shapefiles'
# npy Masks
masks_dir = '/content/drive/My Drive/Dokumente.GD/FS06 SS24/BACHELORARBEIT/MoSE/data/masks'

In [10]:
# Load class labels
################################################################################
# specific Shapefile path
shp_path = os.path.join(labels_dir, "GSK_24_WGS84_adjusted.shp") # -------------------->> ADJUSTABLE

labels = gpd.read_file(shp_path) # read shapefile
labels_filtered = labels[labels["Elementtyp"].notnull()] # remove NULL
print(labels_filtered["Elementtyp"].unique()) # print all label classes

['Einzelstein' 'Wurzelstock' 'Steinverbauung' 'Totholz' 'Steinriegel'
 'Schotterbank' 'Schlamm_Sandinsel' 'Sand_Schlammbank' 'Schotterinsel']


In [11]:
# Load class codes
################################################################################
# path
label_codes_path = os.path.join(codes_dir, "label_codes.json")   # -------------------->> ADJUSTABLE

# Open and load the JSON file
with open(label_codes_path, "r") as json_file:
    label_codes = json.load(json_file)

print(label_codes)
len(label_codes)

{'Einzelstein': 1, 'Wurzelstock': 2, 'Steinverbauung': 3, 'Totholz': 4, 'Steinriegel': 5, 'Schotterbank': 6, 'Schlamm_Sandinsel': 7, 'Sand_Schlammbank': 8, 'Schotterinsel': 9}


9

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [10]:
# reversed dictionary
reversed_label_codes = {v: k for k, v in label_codes.items()} # v = value, k = key
reversed_label_codes[4]

'Totholz'

In [11]:
################################################################################
# Load preprocessed patches
################################################################################

# list of all .npz-files (.npz-compression)
################################################################################
patches_npz_list = [f for f in os.listdir(preprocessed_patches_dir) if f.endswith('.npz')]
patches_npz_list.sort()
patches_npz_list[0:3]

['A01_patch_0.npy.npz', 'A01_patch_1.npy.npz', 'A01_patch_10.npy.npz']

In [12]:
# ################################################################################
# # SLOW VERSION TO LOAD ALL PATCHES (arrays)
# ################################################################################

# # empty list for saving loaded arrays (patches)
# patches = []

# for idx, patch_npz_name in enumerate(patches_npz_list): # iterates over all npz-patches in the list
#   patch_npz_path = os.path.join(preprocessed_patches_dir, patch_npz_name) # path to npz-patch

#   with np.load(patch_npz_path) as data: # load npz-patch

#     array_keys = list(data.keys()) # access to array in the npz-patch
#     # print(f"Array Keys: {array_keys}")

#     if len(array_keys) > 1: # if more than 1 array in the npz-patch
#       print(f"WARNING MESSAGE: .npz-file '{patch_npz_name}' contains {len(array_keys)} arrays: {array_keys}") # Warning message

#     patch_name = patch_npz_name.replace(".npz", "") # change to .npy-name
#     array_data = data[array_keys[0]] # extract array data
#     patches.append((patch_name, array_data)) # save as tuple of patch name and corresponding data in the patches list

#     if idx % 100 == 0: # show progress
#       print(f"{idx}/{len(patches_npz_list)} files loaded.")

# print(f"In total {len(patches)} patches successfully loaded.")

In [14]:
################################################################################
# FAST VERSION
################################################################################

from concurrent.futures import ThreadPoolExecutor, as_completed # import ThreadPoolExecutor for parallelization (parallel loading of patches in each batch)

# Final list to save loaded patches (name, data)
patches = []

# Function to load a single .npz file and extract the first array
def load_npz_file(file_name, directory):
    """ Load an .npz file, extract the first array, and return (name, array). """
    file_path = os.path.join(directory, file_name) # path to .npz-file

    try:
        with np.load(file_path) as data:  # Load the .npz file

            array_keys = list(data.keys())  # Get all keys (array names)

            if len(array_keys) > 1:  # Print a warning if multiple arrays are present
                print(f".npz-file '{file_name}' contains {len(array_keys)} arrays: {array_keys}")

            patch_name = file_name.replace(".npz", "")  # Remove '.npz' to get the base name
            array_data = data[array_keys[0]]  # Extract the first array

            return (patch_name, array_data)  # Return a tuple (name, array)

    except Exception as e:  # Handle any errors during file loading
        print(f"Error loading {file_name}: {e}")
        return None

# Batch size to load files in chunks
batch_size = 100

# Use ThreadPoolExecutor for parallel file loading
for i in range(0, len(patches_npz_list), batch_size):
    batch_files = patches_npz_list[i:i + batch_size]  # Get a batch of files
    print(f"Processing batch {i // batch_size + 1}/{(len(patches_npz_list) // batch_size) + 1}...")

    # List to temporarily store loaded patches from the current batch
    batch_patches = []

    # Parallel loading within the batch
    with ThreadPoolExecutor(max_workers=8) as executor:  # Adjust max_workers as needed
        futures = [executor.submit(load_npz_file, file_name, preprocessed_patches_dir) for file_name in batch_files]

        # Collect results as they are completed
        for future in as_completed(futures):
            result = future.result()
            if result:  # Only append successful results
                batch_patches.append(result)

    patches.extend(batch_patches)  # Add the loaded batch to the final list
    print(f"Loaded {len(batch_patches)} patches in this batch.")

# Summary
print(f"\nIn total {len(patches)} patches successfully loaded.")


Processing batch 1/112...


KeyboardInterrupt: 

In [None]:
# patches

In [None]:
# Test preprocessed patch
################################################################################

#####################
SECTION = "A01" # -------------------->> ADJUSTABLE
TEST_PATCH_ID = 16 # -------------------->> ADJUSTABLE
#####################

test_patch_path = preprocessed_patches_dir + f"/{SECTION}_patch_{TEST_PATCH_ID}.npy.npz"

# load npz-file
test_patch_npz = np.load(test_patch_path)
# print(test_pre_patch_npz)

for array in test_patch_npz.files:
  print(array)
  test_patch = test_patch_npz[array]
  print(test_patch.shape)


# extract the RGB image
# test_pre_patch = test_pre_patch_npz['arr_0']
# print(test_pre_patch)

print(np.min(test_patch), np.max(test_patch))  # minimum and maximum
test_patch_normalized = test_patch - np.min(test_patch ) # set minimum to 0
test_patch_normalized = test_patch_normalized / np.max(test_patch_normalized)  # maximize to 1
print(np.min(test_patch_normalized), np.max(test_patch_normalized))  # minimum and maximum after normalization


# plot the preprocessed image
fig, ax = plt.subplots(figsize=(8, 8))
ax.imshow(test_patch_normalized.transpose(1, 2, 0))  # transpose for RGB depiction
ax.set_title("Preprocessed Patch")
plt.show()

In [None]:
################################################################################
# Load masks
################################################################################

masks_list = [f for f in os.listdir(masks_dir) if f.endswith('_mask.npy')]  # list of all masks
masks_list.sort() # sort list alphabetically
# masks_list

In [None]:
# Test mask
################################################################################

#####################
SECTION = "A01" # -------------------->> ADJUSTABLE
TEST_MASK_ID = 96 # -------------------->> ADJUSTABLE
#####################

test_mask_path = masks_dir + f"/{SECTION}_patch_{TEST_MASK_ID}_mask.npy"

# load mask
test_mask = np.load(test_mask_path)

# Plot:
fig, axes = plt.subplots(3, 3, figsize=(12, 12))  # 3x3 grid (for 9 masks)
axes = axes.flatten()  # easier to iterate through

for i in range(test_mask.shape[0]):  # iterate through the 9 classes
  axes[i].imshow(test_mask[i], cmap="gray")
  axes[i].set_title(f"Class {i} - - {reversed_label_codes[i + 1]}")
  axes[i].axis("off")

plt.tight_layout()
plt.show()

## 2. Choose segmentation model

In [None]:
model = smp.Unet(   # -------------------->> ADJUSTABLE
    encoder_name="resnet18",        # choose encoder, e.g. mobilenet_v2 or efficientnet-b7
    encoder_weights="imagenet",     # use `imagenet` pre-trained weights for encoder initialization
    in_channels=3,                  # model input channels (3 for RGB)
    classes=len(label_codes),       # model output channels (number of classes)
)

## 3. Splitting data into training and test sets

In [None]:
from sklearn.model_selection import train_test_split

In [None]:
# pre_patches_list
# masks_list

In [None]:
# helper-function in order to extract section and patch_id
def extract_section_and_id(file_name):
    parts = file_name.split("_") # split condition: _
    section = parts[0]  # extract section from file_name, e.g. "A01"
    patch_id = parts[2].replace(".npy.npz", "").replace("_mask", "") #  extract patch_id, e.g. 481
    return section, patch_id

In [None]:
# print section and patch_id from masks and patches
print(extract_section_and_id(masks_list[1]))
print(extract_section_and_id(patches_npz_list[175]))

In [None]:
# group patches by section
section_patches = {} # empty dictionary
for patch in patches_npz_list: # iterate over all preprocessed patches
    section, patch_id = extract_section_and_id(patch) # extract section and id
    section_patches.setdefault(section, []).append((patch, patch_id)) # creates keys of sections with patches and their ids inside

In [None]:
print(section_patches.keys())
# print(section_patches)

In [None]:
# Separate patches in Training and Validation/Test datasets by sections
################################################################################
### HYPERPARAMETER ####
TRAIN_SECTIONS = ["A01", "A02", "A03", "A04", "A05", "A08"]  # Train sections
TEST_SECTIONS = ["A06", "A07"]  # Validation/Test sections
################################################################################

# empty lists for patches
train_data = []
test_data = []

for section, files in section_patches.items(): # iterates through the dictionary of sections
    if section in TRAIN_SECTIONS: # if the section is a training section
        train_data.extend(files) # if yes, the patches are added to the training data
    elif section in TEST_SECTIONS:
        test_data.extend(files)

print(f"Training Patches: {len(train_data)}")
print(f"Test Patches: {len(test_data)}")

In [None]:
train_data[8][0]

In [None]:
def has_mask(patch_name, masks_dir):
    mask_path = os.path.join(masks_dir, patch_name.replace(".npy.npz", "_mask.npy"))
    return os.path.exists(mask_path)

In [None]:
train_with_masks = [f for f in train_data if has_mask(f[0], masks_dir)]
train_background = [f for f in train_data if not has_mask(f[0],masks_dir)]
test_with_masks = [f for f in test_data if has_mask(f[0], masks_dir)]
test_background = [f for f in test_data if not has_mask(f[0], masks_dir)]

print(f"Training mit Masken: {len(train_with_masks)}")
print(f"Training Hintergrund: {len(train_background)}")
print(f"Test mit Masken: {len(test_with_masks)}")
print(f"Test Hintergrund: {len(test_background)}")

In [None]:
from torch.utils.data import Dataset

class PatchDataset(Dataset):
    def __init__(self, data_list, patches_dir, masks_dir=None, transform=None):
        """
        Custom Dataset for loading patches and optional masks.
        Args:
            data_list (list): List of (patch_name, array (image data)) tuples.
            patches_dir (str): Directory containing patch .npy files.
            masks_dir (str): Directory containing mask.npy files (optional).
            transform (callable, optional): Transformation to be applied to the data.
        """
        self.data_list = data_list
        self.patches_dir = patches_dir
        self.masks_dir = masks_dir
        self.transform = transform

    def __len__(self):
        return len(self.data_list)

    def __getitem__(self, idx):
        # Load the patch
        patch_name = self.data_list[idx][0]
        patch_path = os.path.join(self.patches_dir, patch_name)
        patch = np.load(patch_path)

        # Convert to Tensor and change dtype
        patch = torch.tensor(patch, dtype=torch.float32)

        # Load the mask if available
        if self.masks_dir:
            mask_path = os.path.join(self.masks_dir, patch_name.replace(".npy.npz", "_mask.npy"))
            if os.path.exists(mask_path):
                mask = np.load(mask_path)
                mask = torch.tensor(mask, dtype=torch.float32)
            else:
                mask = torch.zeros((patch.shape[1], patch.shape[2]), dtype=torch.float32)  # Background mask
        else:
            mask = torch.zeros((patch.shape[1], patch.shape[2]), dtype=torch.float32)  # Default background mask

        # Apply any transformations if needed
        if self.transform:
            patch, mask = self.transform(patch, mask)

        return patch, mask


### ?.1 Choose optimizer and loss function

In [13]:
loss = smp.losses.DiceLoss(mode="multiclass")
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

## Citing

In [41]:
# @misc{Iakubovskii:2019,
#   Author = {Pavel Iakubovskii},
#   Title = {Segmentation Models Pytorch},
#   Year = {2019},
#   Publisher = {GitHub},
#   Journal = {GitHub repository},
#   Howpublished = {\url{https://github.com/qubvel/segmentation_models.pytorch}}
# }