<a href="https://colab.research.google.com/github/Kopyshevskiy/AN2DL_ch2/blob/main/unet2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [12]:
from google.colab import drive
drive.mount("/gdrive")
%cd /gdrive/My Drive/AN2DL/Homework2

Drive already mounted at /gdrive; to attempt to forcibly remount, call drive.mount("/gdrive", force_remount=True).
/gdrive/My Drive/AN2DL/Homework2


In [13]:
# Set seed for reproducibility
seed = 42

# Import necessary libraries
import os

# Set environment variables before importing modules
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
os.environ['PYTHONHASHSEED'] = str(seed)
os.environ['MPLCONFIGDIR'] = os.getcwd() + '/configs/'

# Suppress warnings
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)
warnings.simplefilter(action='ignore', category=Warning)

# Import necessary modules
import logging
import random
import numpy as np

# Set seeds for random number generators in NumPy and Python
np.random.seed(seed)
random.seed(seed)

# Import TensorFlow and Keras
import tensorflow as tf
from tensorflow import keras as tfk
from tensorflow.keras import layers as tfkl
from tensorflow.keras.models import Model
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau
from tensorflow.keras.preprocessing.image import ImageDataGenerator


# Set seed for TensorFlow
tf.random.set_seed(seed)
tf.compat.v1.set_random_seed(seed)

# Reduce TensorFlow verbosity
tf.autograph.set_verbosity(0)
tf.get_logger().setLevel(logging.ERROR)
tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.ERROR)

# Print TensorFlow version
print(tf.__version__)

# Import other libraries
import os
import math
from PIL import Image
from keras import backend as K
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score, confusion_matrix
from sklearn.utils.class_weight import compute_class_weight
import matplotlib.pyplot as plt
import seaborn as sns
import matplotlib.colors as mcolors

from datetime import datetime
import pandas as pd

# Configure plot display settings
sns.set(font_scale=1.4)
sns.set_style('white')
plt.rc('font', size=14)
%matplotlib inline

2.17.1


In [14]:
# load the data
data = np.load("mars_for_students.npz")

training_set = data["training_set"]
X_train = training_set[:, 0]
y_train = training_set[:, 1]

X_test = data["test_set"]

X_train, X_val, y_train, y_val = train_test_split(
    X_train, y_train, test_size=0.2, random_state=42
)

X_train = X_train.astype(np.float32)
X_val   = X_val.astype(np.float32)
X_train = np.expand_dims(X_train, axis=-1)
X_val = np.expand_dims(X_val, axis=-1)


y_train = y_train.astype(np.int32)
y_val   = y_val.astype(np.int32)
y_train = np.expand_dims(y_train, axis=-1)
y_val   = np.expand_dims(y_val, axis=-1)

print(f" X_train: {X_train.shape}")
print(f" y_train: {y_train.shape}")
print(f" X_val:   {X_val.shape}")

 X_train: (2092, 64, 128, 1)
 y_train: (2092, 64, 128, 1)
 X_val:   (523, 64, 128, 1)


In [15]:
def geometric_augmentation(X, y, augmenter, num_augmentations=1):

    augmented_X, augmented_y = [], []

    for i in range(len(X)):

        img = X[i].reshape((1,) + X[i].shape)
        mask = y[i].reshape((1,) + y[i].shape)

        for _ in range(num_augmentations):

            seed = np.random.randint(1e6)

            img_flow = augmenter.flow(img,   batch_size=1, seed=seed)
            mask_flow = augmenter.flow(mask, batch_size=1, seed=seed)

            aug_img  = next(img_flow)[0]
            aug_mask = next(mask_flow)[0]

            augmented_X.append(aug_img)
            augmented_y.append(aug_mask)

    return np.array(augmented_X), np.array(augmented_y)


In [16]:
augmenter = ImageDataGenerator(
    rotation_range = 20,
    width_shift_range = 0.1,
    height_shift_range = 0.1,
    shear_range = 0.1,
    zoom_range = 0.1,
    horizontal_flip = True
)

X_train, y_train = geometric_augmentation(X_train, y_train, augmenter, num_augmentations = 5)

print(f" X_train: {X_train.shape}")
print(f" y_train: {y_train.shape}")

 X_train: (10460, 64, 128, 1)
 y_train: (10460, 64, 128, 1)


In [17]:
def photometric_augmentation(X, y, augmenter, num_augmentations=1):

    augmented_X, augmented_y = [], []

    for i in range(len(X)):
        img = X[i].reshape((1,) + X[i].shape)
        label = y[i]

        for _ in range(num_augmentations):
            for augmented_img in augmenter.flow(img, batch_size=1):
                augmented_X.append(augmented_img[0])
                augmented_y.append(label)
                break

    return np.array(augmented_X), np.array(augmented_y)

In [18]:
augmenter = ImageDataGenerator(
    brightness_range=[0.2, 1.8],
)

X_train, y_train = photometric_augmentation(X_train, y_train, augmenter, num_augmentations = 1)

print(f" X_train: {X_train.shape}")
print(f" y_train: {y_train.shape}")

 X_train: (10460, 64, 128, 1)
 y_train: (10460, 64, 128, 1)


In [20]:
# import tensorflow as tf
# from tensorflow.keras import layers, models, Input


# def residual_block(x, filters, dropout_rate=0.0):
#     shortcut = x

#     # First Conv -> BN -> ReLU
#     x = layers.Conv2D(filters, 3, padding='same', kernel_initializer='he_normal')(x)
#     x = layers.BatchNormalization()(x)
#     x = layers.ReLU()(x)

#     # Second Conv -> BN
#     x = layers.Conv2D(filters, 3, padding='same', kernel_initializer='he_normal')(x)
#     x = layers.BatchNormalization()(x)

#     # Add the shortcut connection
#     # Adjust the number of filters in the shortcut to match x
#     shortcut = layers.Conv2D(filters, 1, padding='same', kernel_initializer='he_normal')(shortcut)
#     x = layers.Add()([x, shortcut])
#     x = layers.ReLU()(x)

#     # Optional Dropout
#     if dropout_rate > 0.0:
#         x = layers.Dropout(dropout_rate)(x)

#     return x


# def unet_model_with_bn_rb():
#     inputs = Input(shape=(64, 128, 1))

#     # Encoder with Residual Blocks
#     conv1 = layers.Conv2D(64, 3, padding='same', kernel_initializer='he_normal')(inputs)
#     conv1 = residual_block(conv1, 64, dropout_rate=0.1)
#     mpool1 = layers.MaxPool2D()(conv1)

#     conv2 = layers.Conv2D(128, 3, padding='same', kernel_initializer='he_normal')(mpool1)
#     conv2 = residual_block(conv2, 128, dropout_rate=0.2)
#     mpool2 = layers.MaxPool2D()(conv2)

#     conv3 = layers.Conv2D(256, 3, padding='same', kernel_initializer='he_normal')(mpool2)
#     conv3 = residual_block(conv3, 256, dropout_rate=0.3)
#     mpool3 = layers.MaxPool2D()(conv3)

#     conv4 = layers.Conv2D(512, 3, padding='same', kernel_initializer='he_normal')(mpool3)
#     conv4 = residual_block(conv4, 512, dropout_rate=0.4)
#     mpool4 = layers.MaxPool2D()(conv4)

#     # Bottleneck
#     conv5 = layers.Conv2D(1024, 3, padding='same', kernel_initializer='he_normal')(mpool4)
#     conv5 = residual_block(conv5, 1024, dropout_rate=0.5)

#     # Decoder with Residual Blocks
#     up6 = layers.Conv2DTranspose(512, 2, strides=2, kernel_initializer='he_normal', padding='same')(conv5)
#     conv6 = layers.Concatenate()([up6, conv4])
#     conv6 = residual_block(conv6, 512, dropout_rate=0.4)

#     up7 = layers.Conv2DTranspose(256, 2, strides=2, kernel_initializer='he_normal', padding='

In [21]:
import tensorflow as tf
from tensorflow.keras import layers, models, Input

def unet_model_with_bn_rb():
    inputs = Input(shape=(64, 128, 1))

    # Encoder
    conv1 = layers.Conv2D(64, 3, padding='same', kernel_initializer='he_normal')(inputs)
    conv1 = layers.BatchNormalization()(conv1)
    conv1 = layers.ReLU()(conv1)
    conv1 = layers.Conv2D(64, 3, padding='same', kernel_initializer='he_normal')(conv1)
    conv1 = layers.BatchNormalization()(conv1)
    conv1 = layers.Add()([layers.Conv2D(64, 1, padding='same', kernel_initializer='he_normal')(inputs), conv1])  # Match channels
    conv1 = layers.ReLU()(conv1)
    conv1 = layers.Dropout(0.1)(conv1)
    mpool1 = layers.MaxPool2D()(conv1)

    conv2 = layers.Conv2D(128, 3, padding='same', kernel_initializer='he_normal')(mpool1)
    conv2 = layers.BatchNormalization()(conv2)
    conv2 = layers.ReLU()(conv2)
    conv2 = layers.Conv2D(128, 3, padding='same', kernel_initializer='he_normal')(conv2)
    conv2 = layers.BatchNormalization()(conv2)
    conv2 = layers.Add()([layers.Conv2D(128, 1, padding='same', kernel_initializer='he_normal')(mpool1), conv2])  # Match channels
    conv2 = layers.ReLU()(conv2)
    conv2 = layers.Dropout(0.2)(conv2)
    mpool2 = layers.MaxPool2D()(conv2)

    conv3 = layers.Conv2D(256, 3, padding='same', kernel_initializer='he_normal')(mpool2)
    conv3 = layers.BatchNormalization()(conv3)
    conv3 = layers.ReLU()(conv3)
    conv3 = layers.Conv2D(256, 3, padding='same', kernel_initializer='he_normal')(conv3)
    conv3 = layers.BatchNormalization()(conv3)
    conv3 = layers.Add()([layers.Conv2D(256, 1, padding='same', kernel_initializer='he_normal')(mpool2), conv3])  # Match channels
    conv3 = layers.ReLU()(conv3)
    conv3 = layers.Dropout(0.3)(conv3)
    mpool3 = layers.MaxPool2D()(conv3)

    conv4 = layers.Conv2D(512, 3, padding='same', kernel_initializer='he_normal')(mpool3)
    conv4 = layers.BatchNormalization()(conv4)
    conv4 = layers.ReLU()(conv4)
    conv4 = layers.Conv2D(512, 3, padding='same', kernel_initializer='he_normal')(conv4)
    conv4 = layers.BatchNormalization()(conv4)
    conv4 = layers.Add()([layers.Conv2D(512, 1, padding='same', kernel_initializer='he_normal')(mpool3), conv4])  # Match channels
    conv4 = layers.ReLU()(conv4)
    conv4 = layers.Dropout(0.4)(conv4)
    mpool4 = layers.MaxPool2D()(conv4)

    # Bottleneck
    conv5 = layers.Conv2D(1024, 3, padding='same', kernel_initializer='he_normal')(mpool4)
    conv5 = layers.BatchNormalization()(conv5)
    conv5 = layers.ReLU()(conv5)
    conv5 = layers.Conv2D(1024, 3, padding='same', kernel_initializer='he_normal')(conv5)
    conv5 = layers.BatchNormalization()(conv5)
    conv5 = layers.Add()([layers.Conv2D(1024, 1, padding='same', kernel_initializer='he_normal')(mpool4), conv5])  # Match channels
    conv5 = layers.ReLU()(conv5)
    conv5 = layers.Dropout(0.5)(conv5)

    # Decoder
    up6 = layers.Conv2DTranspose(512, 2, strides=2, kernel_initializer='he_normal', padding='same')(conv5)
    up6 = layers.Concatenate()([up6, conv4])
    conv6 = layers.Conv2D(512, 3, padding='same', kernel_initializer='he_normal')(up6)
    conv6 = layers.BatchNormalization()(conv6)
    conv6 = layers.ReLU()(conv6)
    conv6 = layers.Conv2D(512, 3, padding='same', kernel_initializer='he_normal')(conv6)
    conv6 = layers.BatchNormalization()(conv6)
    conv6 = layers.Add()([layers.Conv2D(512, 1, padding='same', kernel_initializer='he_normal')(up6), conv6])  # Match channels
    conv6 = layers.ReLU()(conv6)
    conv6 = layers.Dropout(0.4)(conv6)

    up7 = layers.Conv2DTranspose(256, 2, strides=2, kernel_initializer='he_normal', padding='same')(conv6)
    up7 = layers.Concatenate()([up7, conv3])
    conv7 = layers.Conv2D(256, 3, padding='same', kernel_initializer='he_normal')(up7)
    conv7 = layers.BatchNormalization()(conv7)
    conv7 = layers.ReLU()(conv7)
    conv7 = layers.Conv2D(256, 3, padding='same', kernel_initializer='he_normal')(conv7)
    conv7 = layers.BatchNormalization()(conv7)
    conv7 = layers.Add()([layers.Conv2D(256, 1, padding='same', kernel_initializer='he_normal')(up7), conv7])  # Match channels
    conv7 = layers.ReLU()(conv7)
    conv7 = layers.Dropout(0.3)(conv7)

    up8 = layers.Conv2DTranspose(128, 2, strides=2, kernel_initializer='he_normal', padding='same')(conv7)
    up8 = layers.Concatenate()([up8, conv2])
    conv8 = layers.Conv2D(128, 3, padding='same', kernel_initializer='he_normal')(up8)
    conv8 = layers.BatchNormalization()(conv8)
    conv8 = layers.ReLU()(conv8)
    conv8 = layers.Conv2D(128, 3, padding='same', kernel_initializer='he_normal')(conv8)
    conv8 = layers.BatchNormalization()(conv8)
    conv8 = layers.Add()([layers.Conv2D(128, 1, padding='same', kernel_initializer='he_normal')(up8), conv8])  # Match channels
    conv8 = layers.ReLU()(conv8)
    conv8 = layers.Dropout(0.2)(conv8)

    up9 = layers.Conv2DTranspose(64, 2, strides=2, kernel_initializer='he_normal', padding='same')(conv8)
    up9 = layers.Concatenate()([up9, conv1])
    conv9 = layers.Conv2D(64, 3, padding='same', kernel_initializer='he_normal')(up9)
    conv9 = layers.BatchNormalization()(conv9)
    conv9 = layers.ReLU()(conv9)
    conv9 = layers.Conv2D(64, 3, padding='same', kernel_initializer='he_normal')(conv9)
    conv9 = layers.BatchNormalization()(conv9)
    conv9 = layers.Add()([layers.Conv2D(64, 1, padding='same', kernel_initializer='he_normal')(up9), conv9])  # Match channels
    conv9 = layers.ReLU()(conv9)
    conv9 = layers.Dropout(0.1)(conv9)

    outputs = layers.Conv2D(5, 1, activation='softmax', kernel_initializer='he_normal')(conv9)

    model = models.Model(inputs=inputs, outputs=outputs)
    return model


In [22]:
model = unet_model_with_bn_rb()

In [23]:
class MeanIntersectionOverUnion(tf.keras.metrics.MeanIoU):
    def __init__(self, num_classes, labels_to_exclude=None, name="mean_iou", dtype=None):
        super(MeanIntersectionOverUnion, self).__init__(num_classes=num_classes, name=name, dtype=dtype)
        if labels_to_exclude is None:
            labels_to_exclude = [0]  # Default to excluding label 0
        self.labels_to_exclude = labels_to_exclude

    def update_state(self, y_true, y_pred, sample_weight=None):
        # Convert predictions to class labels
        y_pred = tf.math.argmax(y_pred, axis=-1)

        # Flatten the tensors
        y_true = tf.reshape(y_true, [-1])
        y_pred = tf.reshape(y_pred, [-1])

        # Apply mask to exclude specified labels
        for label in self.labels_to_exclude:
            mask = tf.not_equal(y_true, label)
            y_true = tf.boolean_mask(y_true, mask)
            y_pred = tf.boolean_mask(y_pred, mask)

        # Update the state
        return super().update_state(y_true, y_pred, sample_weight)

model.compile(
    loss=tf.keras.losses.SparseCategoricalCrossentropy(),
    optimizer=tf.keras.optimizers.AdamW(1e-3),
    metrics=["accuracy", MeanIntersectionOverUnion(num_classes=5, labels_to_exclude=[0])]
)

In [24]:
early_stopping = tf.keras.callbacks.EarlyStopping(
    monitor='val_accuracy',
    mode='max',
    patience=10,
    restore_best_weights=True
)

In [None]:
history = model.fit(
    x = X_train,
    y = y_train,
    epochs = 100,
    callbacks = early_stopping,
    validation_data = (X_val, y_val),
    verbose=1
).history

Epoch 1/100
[1m325/327[0m [32m━━━━━━━━━━━━━━━━━━━[0m[37m━[0m [1m0s[0m 38ms/step - accuracy: 0.2735 - loss: 16.4371 - mean_iou: 0.0976