# Imports

In [None]:
import rasterio
import geopandas as gpd
from rasterio.features import rasterize
from sklearn.model_selection import train_test_split
import numpy as np
import matplotlib.pyplot as plt
import random

In [None]:
import tensorflow as tf
from tensorflow.keras.models import Sequential, Model, load_model
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Input, MaxPooling2D, Dropout, Conv2DTranspose, concatenate
from tensorflow.keras import layers, models, optimizers, losses, metrics
from tensorflow.keras.callbacks import ModelCheckpoint
from tensorflow.keras import backend as K

In [None]:
def read_and_rasterize_image(tif_path, shape_file_path):
    with rasterio.open(tif_path) as src:
        image = src.read([1, 2, 3])  # Reading the first three bands (assuming RGB)
        transform = src.transform

    # Read the shapefile
    shapes = gpd.read_file(shape_file_path)

    # Rasterize the shapefile to create a mask
    mask = rasterize(
        [(shape, 1) for shape in shapes.geometry],
        out_shape=image[0].shape,
        transform=transform,
        fill=0,
        all_touched=True,
        dtype='uint8'
    )
    

    # Normalize the image
    image = image / 255.0
    return image.transpose((1, 2, 0)), mask

In [None]:
def divide_into_patches(image, mask, patch_size):
    """
    Divides an image and its mask into smaller patches.

    Parameters:
    - image: The image to be divided (numpy array).
    - mask: The corresponding mask for the image (numpy array).
    - patch_size: The size of each patch (tuple of two integers).

    Returns:
    - image_patches: A list of image patches.
    - mask_patches: A list of mask patches.
    """
    # Ensure the input dimensions are compatible
    assert image.shape[0:2] == mask.shape[0:2], "Image and mask must have the same dimensions"
    
    # Calculate the number of patches along each dimension
    patches_along_height = image.shape[0] // patch_size[0]
    patches_along_width = image.shape[1] // patch_size[1]

    image_patches = []
    mask_patches = []

    for i in range(patches_along_height):
        for j in range(patches_along_width):
            # Calculate patch coordinates
            start_row = i * patch_size[0]
            end_row = start_row + patch_size[0]
            start_col = j * patch_size[1]
            end_col = start_col + patch_size[1]

            # Extract patches
            image_patch = image[start_row:end_row, start_col:end_col]
            mask_patch = mask[start_row:end_row, start_col:end_col]

            image_patches.append(image_patch)
            mask_patches.append(mask_patch)

    return np.array(image_patches), np.array(mask_patches)

def reshape_into_patched(image, mask, patch_size):
    patch_size = list(patch_size)
    mosaic_size = tuple(patch_size.insert(0, -1))
    return image.reshape(mosaic_size), mask_patches.reshape(mosaic_size)

In [None]:
file_names = ["2022_131000_456000_RGB_hrl", "2022_133000_456000_RGB_hrl", "2022_136000_457000_RGB_hrl", "2022_129000_458000_RGB_hrl", "2022_131000_455000_RGB_hrl"]
patch_size = (512, 512)

def pre_process(file_names):
    folds = {}
    for i, file_name in enumerate(file_names):
        mask_path = f"masks/{file_name}.gpkg"
        tif_path = f"nl_8cm/{file_name}.tif"
        image, mask = read_and_rasterize_image(tif_path, mask_path)
        image, mask = divide_into_patches(image=image, mask=mask, patch_size=(512, 512))
        # image, mask = reshape_into_patched(image=image, mask=mask, patch_size=(500, 500))
        folds[i] = {"images": image, "masks": mask}
    return folds

In [None]:
def show_images_masks(folds, n_images = 10):
    fold = 0
    
    num_images = len(folds[fold]['images'])
    indices = np.random.choice(range(num_images), size=n_images, replace=False)
    
    # Set up the matplotlib figure and axes
    fig, axs = plt.subplots(10, 2, figsize=(10, 40))  # Adjust figsize as needed
    
    for i, idx in enumerate(indices):
        image = folds[fold]["images"][idx] # Load the image
        mask = folds[fold]["masks"][idx]  # Load the corresponding mask
    
        axs[i, 0].imshow(image, cmap='gray')
        axs[i, 0].axis('off')  # Remove axis ticks and labels
        axs[i, 0].set_title(f"Image {idx}")
    
        axs[i, 1].imshow(mask, cmap='gray')
        axs[i, 1].axis('off')
        axs[i, 1].set_title(f"Mask {idx}")
    
    plt.tight_layout()
    plt.show()

In [None]:
def create_sets_from_folds(folds):
    train_folds = [0, 1, 2, 3]
    test_fold = 4
    
    X_train = np.concatenate([folds[i]['images'] for i in train_folds], axis=0)
    y_train = np.concatenate([folds[i]['masks'] for i in train_folds], axis=0)
    X_test, y_test = folds[test_fold]['images'], folds[test_fold]['masks']
    return (X_train, y_train), (X_test, y_test)

In [None]:
batch_size=16
train_dataset_save_path = 'train_dataset'
test_dataset_save_path = 'test_dataset'

In [None]:
# Uncomment for preprocessing
# folds = pre_process(file_names)
# print("Loaded all images & mosaiced them.")

# # show_images_masks(folds)

# (X_train, y_train), (X_test, y_test) = create_sets_from_folds(folds)
# print("Created folds and put them together as a train & test sets.")
# print(
#     X_train.shape,
#     y_train.shape,
#     X_test.shape
# )

# # Convert data to tf.data.Dataset and cast to float32
# train_dataset = tf.data.Dataset.from_tensor_slices((X_train.astype(np.float32), y_train.astype(np.uint8)))
# test_dataset = tf.data.Dataset.from_tensor_slices((X_test.astype(np.float32), y_test.astype(np.uint8)))
# print("Converted sets to tf.Datasets and casted to float32.")

# # Shuffle, batch, and prefetch the training dataset
# train_dataset = train_dataset.shuffle(buffer_size=len(X_train)).prefetch(buffer_size=tf.data.AUTOTUNE)

# # Batch and prefetch the test dataset
# test_dataset = test_dataset.prefetch(buffer_size=tf.data.AUTOTUNE)
# print("Buffered & batched sets.")

# # Save training dataset
# train_dataset_save_path = 'train_dataset'
# tf.data.Dataset.save(train_dataset, train_dataset_save_path) # options=compression_opts
# print("Saved Train set.")

# # Save test dataset
# test_dataset_save_path = 'test_dataset'
# tf.data.Dataset.save(test_dataset, test_dataset_save_path) # options=compression_opts
# print("Saved Test set.")


# del folds
# del X_train
# del y_train
# del X_test
# del y_test

In [None]:
# Load training dataset
train_dataset = tf.data.Dataset.load(train_dataset_save_path, 
                                           element_spec=(tf.TensorSpec(shape=(patch_size[0], patch_size[1], 3), dtype=tf.float32),
                                                         tf.TensorSpec(shape=(patch_size[0], patch_size[1]), dtype=tf.uint8)))

# Load test dataset
test_dataset = tf.data.Dataset.load(test_dataset_save_path, 
                                          element_spec=(tf.TensorSpec(shape=(patch_size[0], patch_size[1], 3), dtype=tf.float32),
                                                        tf.TensorSpec(shape=(patch_size[0], patch_size[1]), dtype=tf.uint8)))

In [None]:
train_dataset = train_dataset.batch(batch_size)
test_dataset = test_dataset.batch(batch_size)

In [None]:
train_dataset.element_spec, test_dataset.element_spec

In [None]:
def iou_loss(y_true, y_pred):
    intersection = tf.reduce_sum(y_true * y_pred)
    union = tf.reduce_sum(y_true) + tf.reduce_sum(y_pred) - intersection
    iou_score = intersection / (union + tf.keras.backend.epsilon())  # Add epsilon to avoid division by zero
    return 1.0 - iou_score  # Minimize 1 - IoU

In [None]:
def conv_block(inputs, num_filters):
    x = layers.Conv2D(num_filters, 3, padding="same")(inputs)
    x = layers.BatchNormalization()(x)
    x = tf.keras.activations.relu(x)
    x = layers.Conv2D(num_filters, 3, padding="same")(x)
    x = layers.BatchNormalization()(x)
    x = tf.keras.activations.relu(x)
    return x

def downsample_block(inputs, num_filters):
    x = conv_block(inputs, num_filters)
    p = layers.MaxPooling2D((2, 2))(x)
    return x, p

def upsample_block(inputs, skip_features, num_filters):
    x = layers.Conv2DTranspose(num_filters, (2, 2), strides=2, padding="same")(inputs)
    x = layers.Concatenate()([x, skip_features])
    x = conv_block(x, num_filters)
    return x

def build_unet(input_shape):
    inputs = layers.Input(input_shape)

    s1, p1 = downsample_block(inputs, 64) # 512 -> 256
    s2, p2 = downsample_block(p1, 128) # 256 -> 128
    s3, p3 = downsample_block(p2, 256) # 128 -> 64

    b = conv_block(p3, 256)

    d1 = upsample_block(b, s3, 256) # 64 -> 128
    d2 = upsample_block(d1, s2, 128) # 128 -> 256
    d3 = upsample_block(d2, s1, 64) # 256 -> 512

    outputs = layers.Conv2D(1, (1, 1), padding="same", activation="sigmoid")(d3)

    model = models.Model(inputs, outputs, name="U-Net")
    return model


model = build_unet(input_shape=(patch_size[0], patch_size[1], 3))
model.summary()

In [None]:
model.compile(optimizer=optimizers.Adam(), 
              loss=iou_loss,
              metrics=[metrics.BinaryAccuracy(), metrics.IoU(num_classes=2, target_class_ids=[1])])
# Loss vervang met iou

In [None]:
model_checkpoint_callback = ModelCheckpoint(
    filepath="model.keras",
    save_weights_only=False,  # Set to True to save only weights, False to save the whole model
    monitor='val_loss',
    mode='min',
    save_best_only=True,
    verbose=1)  # Log a message whenever the model is saved

In [None]:
# Now, fit the model using these datasets
history = model.fit(
    train_dataset,
    epochs=10,
    validation_data=test_dataset,
    callbacks=[model_checkpoint_callback],
    verbose=1)

In [None]:
model.save("model.keras")

In [None]:
# # Now, include this callback in your model.fit() call
# history = model.fit(
#     X_train, y_train,
#     epochs=10,
#     batch_size=64,
#     validation_split=0.25, # Should be a individual fold
#     callbacks=[model_checkpoint_callback],  # Include the callback here
#     verbose=1)

In [None]:
# Extracting the history of training and validation loss
loss = history.history['loss']
val_loss = history.history['val_loss']

# Extracting the history of training and validation accuracy
# Note: Replace 'accuracy' with 'acc' if your Keras version uses 'acc' instead
accuracy = history.history['binary_accuracy']
val_accuracy = history.history['val_binary_accuracy']

# Setting up the subplot for loss
plt.figure(figsize=(12, 5))

plt.subplot(1, 2, 1)
plt.plot(loss, label='Training Loss')
plt.plot(val_loss, label='Validation Loss')
plt.title('Training and Validation Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()

# Setting up the subplot for accuracy
plt.subplot(1, 2, 2)
plt.plot(accuracy, label='Training Accuracy')
plt.plot(val_accuracy, label='Validation Accuracy')
plt.title('Training and Validation Accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.legend()

plt.show()

In [None]:
# Load the saved model from the file
model = load_model('model.keras', custom_objects={'iou_loss': iou_loss})

In [None]:
y_pred = model.predict(test_dataset)

In [None]:
# Define function to visualize dataset samples
def visualize_sample(image, mask, prediction):
    fig, ax = plt.subplots(1, 3, figsize=(10, 5))

    ax[0].imshow(image.numpy(), cmap='gray')
    ax[0].set_title('Image')
    
    ax[1].imshow(mask.numpy(), cmap='gray')
    ax[1].set_title('Mask')
    
    ax[2].imshow(prediction, cmap='gray')
    ax[2].set_title('Prediction')
    
    plt.show()

# # Visualize samples
# for i, (image, mask) in enumerate(test_dataset.unbatch().take(3)):
#     visualize_sample(image, mask, y_pred[i])

# Randomly select 10 samples from the dataset
test_dataset_array = test_dataset.unbatch().take(10)

# Predict masks for the selected samples
test_pred = model.predict(test_dataset_array.batch(1))  # Ensure batch size is 1 for individual predictions

# Visualize samples along with their predictions
for (image, mask), pred in zip(test_dataset_array, test_pred):
    visualize_sample(image, mask, pred)

In [None]:
test_dataset

In [None]:
fig, ax = plt.subplots(1, 3, figsize=(10, 5))

id_ = random.randint(0, y_pred.shape[0])

ax[0].imshow(X_test[id_], cmap='gray')
ax[0].set_title('Image')
ax[1].imshow(y_test[id_], cmap='gray')
ax[1].set_title('Mask')
ax[2].imshow(y_pred[id_], cmap='gray')
ax[2].set_title('Pred')
plt.show()