# TUNet TL Exp 3

In [None]:
'''
IMPORTS
'''
import tensorflow as tf
from tensorflow.image import resize, ResizeMethod

import keras
from keras.models import Model
from keras.layers import Input, Conv2D, MaxPooling2D, concatenate, Conv2DTranspose, Dropout
from keras.optimizers import Adam
from keras.callbacks import ModelCheckpoint, EarlyStopping, ReduceLROnPlateau

import random
import os
import time
from numpy import load
import numpy as np
import matplotlib.pyplot as plt

from IPython import get_ipython
from IPython.terminal.interactiveshell import TerminalInteractiveShell

shell = TerminalInteractiveShell.instance()

'''
DATA PATHS
'''
TOP_DIR = '/tf/Notebooks/Iwashita'

TRAIN_DIR = TOP_DIR + '/Data/Preprocessed_wAugmentation/Experiment3/Train'
VAL_DIR = TOP_DIR + '/Data/Preprocessed_wAugmentation/Experiment3/Validate'
TEST_DIR = TOP_DIR + '/Data/Preprocessed_wAugmentation/Experiment3/Test'

'''
OUTPUTS PATH
'''
WEIGHTS_PATH = TOP_DIR + '/Output/Weights/'
METRICS_PATH = TOP_DIR + '/Output/Metrics/'

'''
GPU
'''
gpu_p40 = '/device:GPU:1'
gpu_1660 = '/device:GPU:0'

!nvidia-smi

'''
TRAINING DATA
'''
print(TRAIN_DIR)
!cd /tf/Notebooks/Iwashita/Data/Preprocessed_wAugmentation/Experiment2/Train && ls

exp3_rgb_X_train = load(TRAIN_DIR + '/exp3_rgb_X_train.npy')
exp3_ir_X_train = load(TRAIN_DIR + '/exp3_ir_X_train.npy')
exp3_y_train = load(TRAIN_DIR + '/exp3_y_train.npy')

'''
VALIDATION DATA
'''
exp3_rgb_X_val = load(VAL_DIR + '/exp3_rgb_X_val.npy')
exp3_ir_X_val = load(VAL_DIR + '/exp3_ir_X_val.npy')
exp3_y_val =load(VAL_DIR + '/exp3_y_val.npy')

'''
TEST DATA
'''
exp3_rgb_X_test_a = load(TEST_DIR + '/exp3_rgb_X_test_a.npy')
exp3_ir_X_test_a = load(TEST_DIR + '/exp3_ir_X_test_a.npy')
exp3_y_test_a =load(TEST_DIR + '/exp3_y_test_a.npy')

exp3_rgb_X_test_b = load(TEST_DIR + '/exp3_rgb_X_test_b.npy')
exp3_ir_X_test_b = load(TEST_DIR + '/exp3_ir_X_test_b.npy')
exp3_y_test_b =load(TEST_DIR + '/exp3_y_test_b.npy')

'''
INTERSECTION OVER UNION
'''
def iou(y_true, y_pred, num_classes):
    intersection = np.histogram2d(y_true.flatten(), y_pred.flatten(), bins=num_classes)[0]
    area_true = np.histogram(y_true, bins=num_classes)[0]
    area_pred = np.histogram(y_pred, bins=num_classes)[0]
    area_true = np.expand_dims(area_true, -1)
    area_pred = np.expand_dims(area_pred, 0)

    union = area_true + area_pred - intersection

    union[union == 0] = 1e-9
    iou = intersection / union

    return iou, np.mean(np.diag(iou))

'''
PIXEL ACCURACY
'''
def pixel_accuracy(y_true, y_pred):
    return np.sum(y_true == y_pred) / y_true.size

'''
MEAN ACCURACY
'''
def mean_accuracy(y_true, y_pred, num_classes):
    intersection = np.histogram2d(y_true.flatten(), y_pred.flatten(), bins=num_classes)[0]
    area_true = np.histogram(y_true, bins=num_classes)[0]

    area_true[area_true == 0] = 1e-9
    accuracy = np.diag(intersection) / area_true

    return np.mean(accuracy)

'''
FREQUENCY-WEIGHTED INTERSECTION OVER UNION
'''
def fw_iou(y_true, y_pred, num_classes):
    intersection = np.histogram2d(y_true.flatten(), y_pred.flatten(), bins=num_classes)[0]
    area_true = np.histogram(y_true, bins=num_classes)[0]
    area_pred = np.histogram(y_pred, bins=num_classes)[0]
    area_true = np.expand_dims(area_true, -1)
    area_pred = np.expand_dims(area_pred, 0)

    union = area_true + area_pred - intersection

    union[union == 0] = 1e-9
    iou = intersection / union
    fw_iou = np.sum(area_true * iou) / np.sum(area_true)

    return fw_iou

def display_one_hot_annotation(annotations_onehot):
    label = np.argmax(annotations_onehot, axis=-1)
    cmap = plt.get_cmap('tab10', 7)

    plt.imshow(label, cmap=cmap)
    plt.colorbar(ticks=range(num_classes), format=plt.FuncFormatter(lambda val, loc: {
        0: "unlabeled",
        1: "sand",
        2: "soil",
        3: "ballast",
        4: "rock",
        5: "bedrock",
        6: "rocky terrain"
    }[val]))
    plt.show()

print("Done")

In [None]:
'''
MODEL PARAMS
'''
BATCH_SIZE = 2
EPOCHS = 1000 

LEARNING_RATE = 1e-4
PATIENCE = 15
FACTOR = 0.1

EXP3_FILENAME = "augmented_tunet_tl_exp3_batch{}_epoch{}_lr{}_p{}_f{}".format(
    BATCH_SIZE, EPOCHS, LEARNING_RATE, PATIENCE, FACTOR)

print("Done")

### TUNet TL Common

In [None]:
'''
MODEL
'''
def ContractionPath(inputs, _padding='same', _activation='relu'):
    c1 = Conv2D(64, (3, 3), activation="relu", padding="same")(inputs)
    c1 = Conv2D(64, (3, 3), activation="relu", padding="same")(c1)
    p1 = MaxPooling2D((2, 2))(c1)
    p1 = Dropout(0.25)(p1)

    c2 = Conv2D(128, (3, 3), activation="relu", padding="same")(p1)
    c2 = Conv2D(128, (3, 3), activation="relu", padding="same")(c2)
    p2 = MaxPooling2D((2, 2))(c2)
    p2 = Dropout(0.5)(p2)

    c3 = Conv2D(256, (3, 3), activation="relu", padding="same")(p2)
    c3 = Conv2D(256, (3, 3), activation="relu", padding="same")(c3)
    p3 = MaxPooling2D((2, 2))(c3)
    p3 = Dropout(0.5)(p3)

    c4 = Conv2D(512, (3, 3), activation="relu", padding="same")(p3)
    c4 = Conv2D(512, (3, 3), activation="relu", padding="same")(c4)
    p4 = MaxPooling2D((2, 2))(c4)
    p4 = Dropout(0.5)(p4)
    
    '''
    RETURN
    '''
    return c1, c2, c3, c4, p4

def ExpansionPath(c1, c2, c3, c4, p4, _padding='same', _activation='relu'):    
    cm = Conv2D(1024, (3, 3), activation="relu", padding="same")(p4)
    cm = Conv2D(1024, (3, 3), activation="relu", padding="same")(cm)
    
    deconv4 = Conv2DTranspose(512, (3, 3), strides=(2, 2), padding="same")(cm)
    c4 = resize(c4, (deconv4.shape[1], deconv4.shape[2]), method=ResizeMethod.BILINEAR)
    uconv4 = concatenate([deconv4, c4])
    uconv4 = Dropout(0.5)(uconv4)
    uconv4 = Conv2D(512, (3, 3), activation="relu", padding="same")(uconv4)
    uconv4 = Conv2D(512, (3, 3), activation="relu", padding="same")(uconv4)

    deconv3 = Conv2DTranspose(256, (3, 3), strides=(2, 2), padding="same")(uconv4)
    c3 = resize(c3, (deconv3.shape[1], deconv3.shape[2]), method=ResizeMethod.BILINEAR)
    uconv3 = concatenate([deconv3, c3])
    uconv3 = Dropout(0.5)(uconv3)
    uconv3 = Conv2D(256, (3, 3), activation="relu", padding="same")(uconv3)
    uconv3 = Conv2D(256, (3, 3), activation="relu", padding="same")(uconv3)

    deconv2 = Conv2DTranspose(128, (3, 3), strides=(2, 2), padding="same")(uconv3)
    c2 = resize(c2, (deconv2.shape[1], deconv2.shape[2]), method=ResizeMethod.BILINEAR)
    uconv2 = concatenate([deconv2, c2])
    uconv2 = Dropout(0.5)(uconv2)
    uconv2 = Conv2D(128, (3, 3), activation="relu", padding="same")(uconv2)
    uconv2 = Conv2D(128, (3, 3), activation="relu", padding="same")(uconv2)

    deconv1 = Conv2DTranspose(64, (3, 3), strides=(2, 2), padding="same")(uconv2)
    c1 = resize(c1, (deconv1.shape[1], deconv1.shape[2]), method=ResizeMethod.BILINEAR)
    uconv1 = concatenate([deconv1, c1])
    uconv1 = Dropout(0.5)(uconv1)
    uconv1 = Conv2D(64, (3, 3), activation="relu", padding="same")(uconv1)
    uconv1 = Conv2D(64, (3, 3), activation="relu", padding="same")(uconv1)
    
    return uconv1

def TUNet_TL():
    '''
    INPUT
    '''
    # Expected inputs are an RGB and an IR image
    input_rgb = Input((572, 572, 3))
    input_ir = Input((572, 572))
    input_ir = tf.expand_dims(input_ir, axis=-1)
    
    '''
    CONTRACTION PATH
    '''
    c1_rgb, c2_rgb, c3_rgb, c4_rgb, p4_rgb = ContractionPath(input_rgb)
    c1_ir, c2_ir, c3_ir, c4_ir, p4_ir = ContractionPath(input_ir)
    
    '''
    EXPANSION PATH
    '''
    cout_rgb = ExpansionPath(c1_rgb, c2_rgb, c3_rgb, c4_rgb, p4_rgb)
    cout_ir = ExpansionPath(c1_ir, c2_ir, c3_ir, c4_ir, p4_ir)
    
    cout_cat = concatenate([cout_rgb, cout_ir], axis=-1)

    '''
    OUTPUT
    '''
    softmax_output = Conv2D(7, (1, 1), padding="same", activation='softmax')(cout_cat)
    softmax_output = resize(softmax_output, (572, 572), method=ResizeMethod.BILINEAR)

    '''
    RETURN
    '''
    return Model(inputs=[input_rgb, input_ir], outputs=softmax_output)

'''
TRAIN
'''
def TUNet_Train(model, weights_filename, 
               X_rgb_train, X_ir_train, y_train, 
               X_rgb_val, X_ir_val, y_val):

    try:
        with tf.device(gpu_p40):
            
            model.compile(
                optimizer=Adam(learning_rate=LEARNING_RATE), 
                loss='categorical_crossentropy', 
                metrics=['accuracy'])

            callbacks = [
                ModelCheckpoint(weights_filename, save_best_only=True, save_weights_only=True, verbose=1),
                EarlyStopping(patience=PATIENCE, verbose=1),
                ReduceLROnPlateau(factor=FACTOR, patience=PATIENCE, min_lr=LEARNING_RATE, verbose=1)]
                
            history = model.fit(
                [X_rgb_train, X_ir_train], y_train,
                validation_data=([X_rgb_val, X_ir_val], y_val),
                batch_size=BATCH_SIZE, epochs=EPOCHS, callbacks=callbacks, verbose=2)
        
    except RuntimeError as e:
        print(e)
        
'''
SCORE
'''
def TUNet_Score(model, weights_filename, metrics_filename, 
                X_rgb_test, X_ir_test, y_test):
    
    try:
        with tf.device(gpu_p40):
            model.load_weights(weights_filename)

            score = model.evaluate([X_rgb_test, X_ir_test], y_test, batch_size=2, verbose=1)

            print("Test loss:", score[0])
            print("Test accuracy:", score[1])

            with open(metrics_filename, "a") as f:
                f.write(f"\Test Loss: {score[0]}\n")
                f.write(f"\Test Accuracy: {score[1]}\n")
                
    except RuntimeError as e:
        print(e)

'''
PREDICT
'''
def TUNet_Predict(model, X_rgb_test, X_ir_test):
    try:
        with tf.device(gpu_p40):
            pred = model.predict([X_rgb_test, X_ir_test], batch_size=2)
            
    except RuntimeError as e:
        print(e)
        
    return pred

'''
DISPLAY RANDOM RESULT
'''
def TUNet_Display(X_rgb_test, X_ir_test, y_test, y_pred):
    fig, axes = plt.subplots(1, 4, figsize=(10, 6))
    n = random.randint(0, len(X_rgb_test)-1)
    cmap = plt.get_cmap('tab10', 7)
    
    axes[0].imshow(X_rgb_test[n])
    axes[1].imshow(X_ir_test[n], cmap='gray')
    axes[2].imshow(np.argmax(y_test[n], axis=-1), cmap=cmap)
    axes[3].imshow(np.argmax(y_pred[n], axis=-1), cmap=cmap)
                                           
    axes[0].set_title("RGB")
    axes[1].set_title("IR")
    axes[2].set_title("Annotation")
    axes[3].set_title("Predicted")
    
    for ax in axes.flatten():
        ax.axis("off")
        
    plt.tight_layout()
    plt.show()
        
'''
CALCULATE METRICS
'''
def TUNet_Metrics(y_test, y_pred, metrics_filename):
    y_pred_classes = np.argmax(y_pred, axis=-1)
    y_true_classes = np.argmax(y_test, axis=-1)
    
    num_classes = 7
    
    iou_values, mean_iou = iou(y_true_classes, y_pred_classes, num_classes)
    pixel_acc = pixel_accuracy(y_true_classes, y_pred_classes)
    mean_acc = mean_accuracy(y_true_classes, y_pred_classes, num_classes)
    fw_iou_value = fw_iou(y_true_classes, y_pred_classes, num_classes)
    
    print(f"Mean IoU: {mean_iou}")
    print(f"Pixel accuracy: {pixel_acc}")
    print(f"Mean accuracy: {mean_acc}")
    print(f"Frequency-Weighted IoU: {fw_iou_value}")
    
    with open(metrics_filename, "a") as f:
        f.write("\nIoU Values:\n")
        
        for i, iou_val in enumerate(iou_values):
            f.write(f"Class {i}: {iou_val}\n")
        f.write(f"\nMean IoU: {mean_iou}\n")
        f.write(f"Pixel Accuracy: {pixel_acc}\n")
        f.write(f"Mean Accuracy: {mean_acc}\n")
        f.write(f"Frequency Weighted IoU: {fw_iou_value}\n")
    
print("Done")

In [None]:
try:
    with tf.device(gpu_p40):
        exp3_model = TUNet_TL()
except RuntimeError as e:
    print(e)
    
print("Done")

In [None]:
TUNet_Train(exp3_model, os.path.join(WEIGHTS_PATH, EXP3_FILENAME + '.h5'), 
                exp3_rgb_X_train, exp3_ir_X_train, exp3_y_train,
                exp3_rgb_X_val, exp3_ir_X_val, exp3_y_val)

print("Done")

In [None]:
### Test A

In [None]:
score = TUNet_Score(
    exp3_model, 
    os.path.join(WEIGHTS_PATH, EXP3_FILENAME + '.h5'),
    os.path.join(METRICS_PATH, EXP3_FILENAME + '.txt'),
    exp3_rgb_X_test_a,
    exp3_ir_X_test_a,
    exp3_y_test_a)

In [None]:
y_pred_a = TUNet_Predict(exp3_model, exp3_rgb_X_test_a, exp3_ir_X_test_a)

In [None]:
TUNet_Display(exp3_rgb_X_test_a, exp3_ir_X_test_a, exp3_y_test_a, y_pred_a)

In [None]:
TUNet_Metrics(exp3_y_test_a, y_pred_a, os.path.join(METRICS_PATH, EXP3_FILENAME + '.txt'))

In [None]:
## Test B

In [None]:
score = TUNet_Score(
    exp3_model, 
    os.path.join(WEIGHTS_PATH, EXP3_FILENAME + '.h5'),
    os.path.join(METRICS_PATH, EXP3_FILENAME + '.txt'),
    exp3_rgb_X_test_b,
    exp3_ir_X_test_b,
    exp3_y_test_b)

In [None]:
y_pred_b = TUNet_Predict(exp3_model, exp3_rgb_X_test_b, exp3_ir_X_test_b)

In [None]:
TUNet_Display(exp3_rgb_X_test_b, exp3_ir_X_test_b, exp3_y_test_b, y_pred_b)

In [None]:
TUNet_Metrics(exp3_y_test_b, y_pred_b, os.path.join(METRICS_PATH, EXP3_FILENAME + '.txt'))