In [None]:
import os
import numpy as np
import pandas as pd
import tensorflow as tf
from glob import glob
import tensorflow.keras.layers as L
from tensorflow.keras.layers import GlobalAveragePooling2D, Reshape, Dense, multiply
from tensorflow.keras.models import Model
from tensorflow.keras.applications import ResNet50
from sklearn.model_selection import train_test_split
import cv2
import matplotlib.pyplot as plt
import matplotlib.colors as mcolors
from tensorflow.keras import layers
from tqdm import tqdm
from keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau, CSVLogger

In [None]:
batch_size = 6
lr = 1e-3
epochs = 200
width = 256
height = 256

In [None]:
dataset_path = os.path.join("/kaggle/input/aeroscapes1/aeroscapes")
files_dir = os.path.join("files", "modified_uavid_dataset")
model_file = os.path.join(files_dir, "UnetModel.keras")
log_file = os.path.join(files_dir, "Log-Unet.csv")

# Function to create directory
def create_dir(path):
    if not os.path.exists(path):
        os.makedirs(path)
        
create_dir(files_dir)

In [None]:
def load_data(path):
    Images = sorted(glob(os.path.join(path, "JPEGImages", "*")))
    Labels = sorted(glob(os.path.join(path, "Visualizations", "*")))

    return (Images, Labels)

In [None]:
(Images, Labels) = load_data(dataset_path)

print(f"New Train: {len(Images)} - {len(Labels)}")

# First, split off 10% of the data for testing
train_val_images, test_x, train_val_labels, test_y = train_test_split(Images, Labels, test_size=0.1, random_state=42)

# Then, split the remaining 90% into 70% training and 20% validation (0.7 / 0.9 â‰ˆ 0.78)
train_x, valid_x, train_y, valid_y = train_test_split(train_val_images, train_val_labels, test_size=0.22, random_state=42)

print(f"Training set: {len(train_x)} images")
print(f"Validation set: {len(valid_x)} images")
print(f"Test set: {len(test_x)} images")

In [None]:
def read_image(path):
    path = path.decode()
    x = cv2.imread(path, cv2.IMREAD_COLOR)
    x = cv2.resize(x, (width, height))
    x = x / 255.0
    x = x.astype(np.float32)
    return x

In [None]:
color_map = {
    (0, 0, 0): 0,            # Background
    (192, 128, 128): 1,      # Person
    (0, 128, 0): 2,          # Bike
    (128, 128, 128): 3,      # Car
    (128, 0, 0): 4,          # Drone
    (0, 0, 128): 5,          # Boat
    (192, 0, 128): 6,        # Animal
    (192, 0, 0): 7,          # Obstacle
    (192, 128, 0): 8,        # Construction
    (0, 64, 0): 9,           # Vegetation
    (128, 128, 0): 10,       # Road
    (0, 128, 128): 11,       # Sky
}

def read_mask(path):
    path = path.decode()
    mask = cv2.imread(path)  
    mask = cv2.cvtColor(mask, cv2.COLOR_BGR2RGB) 
    mask = cv2.resize(mask, (width, height), interpolation=cv2.INTER_NEAREST)

    class_indices = np.zeros((height, width), dtype=np.uint8)

    for rgb, idx in color_map.items():
        class_indices[(mask == rgb).all(axis=-1)] = idx

    return class_indices.astype(np.uint8)

In [None]:
def tf_parse(x, y):
    def _parse(x, y):
        x = read_image(x) 
        y = read_mask(y) 
        return x, y
    
    x, y = tf.numpy_function(_parse, [x, y], [tf.float32, tf.uint8]) 
    x.set_shape([height, width, 3]) 
    y.set_shape([height, width])   
    return x, y

In [None]:
def tf_dataset(x, y, batch=6):
    dataset = tf.data.Dataset.from_tensor_slices((x, y))
    dataset = dataset.map(tf_parse, num_parallel_calls=tf.data.AUTOTUNE)
    dataset = dataset.batch(batch)
    dataset = dataset.prefetch(tf.data.AUTOTUNE)
    return dataset

In [None]:
train_dataset = tf_dataset(train_x, train_y, batch=batch_size)
valid_dataset = tf_dataset(valid_x, valid_y, batch=batch_size)
test_dataset = tf_dataset(test_x, test_y, batch=batch_size)

In [None]:
# Create a custom color map based on the color_map dictionary
color_map_values = list(color_map.keys())
color_map_rgb = np.array(color_map_values) / 255.0  # Normalize to 0-1 range for matplotlib

def plot_samples_with_labels(dataset, title):
    plt.figure(figsize=(12, 12))
    for i, (images, masks) in enumerate(dataset.take(1)):  # Take a single batch
        for j in range(4):  # Loop over first four images
            # Plot the image
            plt.subplot(4, 4, j*2 + 1)
            plt.imshow(images[j])
            plt.axis("off")
            plt.title(f"{['Train', 'Validation', 'Test'][title]} Image {j+1}")

            # Convert class indices in the mask to RGB colors
            mask_rgb = np.zeros((height, width, 3), dtype=np.float32)
            for idx, color in enumerate(color_map_rgb):
                mask_rgb[masks[j] == idx] = color

            # Plot the label mask
            plt.subplot(4, 4, j*2 + 2)
            plt.imshow(mask_rgb)
            plt.axis("off")
            plt.title(f"{['Train', 'Validation', 'Test'][title]} Mask {j+1}")
    
    plt.tight_layout()
    plt.show()

# Plot the images and corresponding label masks for each dataset
plot_samples_with_labels(train_dataset, title=0)   # Train images and masks
plot_samples_with_labels(valid_dataset, title=1)   # Validation images and masks
plot_samples_with_labels(test_dataset, title=2)    # Test images and masks

In [None]:
@tf.keras.utils.register_keras_serializable(package='Custom', name='BAM')
class BAM(tf.keras.layers.Layer):
    def __init__(self, channel, reduction=16, dilation_conv_num=2, **kwargs):  # Accept arbitrary kwargs
        super(BAM, self).__init__(**kwargs)
        
        # Channel Attention Module
        self.channel_avg_pool = layers.GlobalAveragePooling2D()
        self.channel_fc1 = layers.Dense(channel // reduction, activation='relu')
        self.channel_fc2 = layers.Dense(channel)
        
        # Spatial Attention Module
        self.spatial_conv1 = layers.Conv2D(channel // reduction, kernel_size=1, activation='relu')
        self.spatial_dilated_convs = [
            layers.Conv2D(channel // reduction, kernel_size=3, padding='same', dilation_rate=dilation, activation='relu')
            for dilation in range(1, dilation_conv_num + 1)
        ]
        self.spatial_conv2 = layers.Conv2D(1, kernel_size=1)

    def call(self, x):
        # Channel Attention
        channel_attention = self.channel_avg_pool(x)
        channel_attention = self.channel_fc1(channel_attention)
        channel_attention = self.channel_fc2(channel_attention)
        channel_attention = tf.nn.sigmoid(channel_attention)
        channel_attention = tf.reshape(channel_attention, [-1, 1, 1, x.shape[-1]])
        channel_refined = x * channel_attention

        # Spatial Attention
        spatial_attention = self.spatial_conv1(channel_refined)
        for conv in self.spatial_dilated_convs:
            spatial_attention = conv(spatial_attention)
        spatial_attention = self.spatial_conv2(spatial_attention)
        spatial_attention = tf.nn.sigmoid(spatial_attention)
        
        # Combining Attention
        refined_feature = x * spatial_attention + x * channel_attention
        return refined_feature

In [None]:
def conv_block(x, num_filters, act=True, use_bam=False):
    x = L.SeparableConv2D(num_filters, kernel_size=3, padding="same")(x)
    x = L.BatchNormalization()(x)
    x = L.Activation("relu")(x)
    
    if act:
        x = L.Conv2D(num_filters, (3, 3), activation='relu', padding='same')(x)
        x = L.BatchNormalization()(x)
        x = L.Activation("relu")(x)
    
    if use_bam:
        x = BAM(channel=num_filters)(x)
    return x

In [None]:
def unet_three_plus(input_shape, expansion=1):
    """ Inputs """
    inputs = L.Input(input_shape, name="input_layer")  ## (256 x 256 x 3)

    """ Encoder """
    base_model = ResNet50(weights='imagenet', include_top=False, input_tensor=inputs)
    e1 = base_model.get_layer('conv1_relu').output
    e2 = base_model.get_layer('conv2_block3_out').output
    e3 = base_model.get_layer('conv3_block4_out').output
    e4 = base_model.get_layer('conv4_block6_out').output
    e5 = base_model.get_layer('conv5_block3_out').output

    # Apply BAM to encoder outputs
    # e1 = BAM(channel=e1.shape[-1])(e1)
    e2 = BAM(channel=e2.shape[-1])(e2)
    e3 = BAM(channel=e3.shape[-1])(e3)
    e4 = BAM(channel=e4.shape[-1])(e4)
    e5 = BAM(channel=e5.shape[-1])(e5)

    """ Decoder 4 """
    e1_d4 = L.MaxPool2D((8, 8))(e1)
    e1_d4 = conv_block(e1_d4, 16 * expansion, act=False, use_bam=True)

    e2_d4 = L.MaxPool2D((4, 4))(e2)
    e2_d4 = conv_block(e2_d4, 16 * expansion, act=False, use_bam=True)

    e3_d4 = L.MaxPool2D((2, 2))(e3)
    e3_d4 = conv_block(e3_d4, 16 * expansion, act=False, use_bam=True)

    e4_d4 = conv_block(e4, 16 * expansion, act=False, use_bam=True)

    e5_d4 = L.UpSampling2D((2, 2), interpolation="bilinear")(e5)
    e5_d4 = conv_block(e5_d4, 16 * expansion, act=False, use_bam=True)

    d4 = L.Concatenate()([e1_d4, e2_d4, e3_d4, e4_d4, e5_d4])
    d4 = conv_block(d4, 16 * 5 * expansion, act=False, use_bam=True)

    """ Decoder 3 """
    e1_d3 = L.MaxPool2D((4, 4))(e1)
    e1_d3 = conv_block(e1_d3, 16 * expansion, act=False, use_bam=True)

    e2_d3 = L.MaxPool2D((2, 2))(e2)
    e2_d3 = conv_block(e2_d3, 16 * expansion, act=False, use_bam=True)

    e3_d3 = conv_block(e3, 16 * expansion, act=False, use_bam=True)

    d4_d3 = L.UpSampling2D((2, 2), interpolation="bilinear")(d4)
    d4_d3 = conv_block(d4_d3, 16 * expansion, act=False, use_bam=True)

    e5_d3 = L.UpSampling2D((4, 4), interpolation="bilinear")(e5)
    e5_d3 = conv_block(e5_d3, 16 * expansion, act=False, use_bam=True)

    d3 = L.Concatenate()([e1_d3, e2_d3, e3_d3, d4_d3, e5_d3])
    d3 = conv_block(d3, 16 * 5 * expansion, act=False, use_bam=True)

    """ Decoder 2 """
    e1_d2 = L.MaxPool2D((2, 2))(e1)
    e1_d2 = conv_block(e1_d2, 16 * expansion, act=False, use_bam=True)

    e2_d2 = conv_block(e2, 16 * expansion, act=False, use_bam=True)

    d3_d2 = L.UpSampling2D((2, 2), interpolation="bilinear")(d3)
    d3_d2 = conv_block(d3_d2, 16 * expansion, act=False, use_bam=True)

    d4_d2 = L.UpSampling2D((4, 4), interpolation="bilinear")(d4)
    d4_d2 = conv_block(d4_d2, 16 * expansion, act=False, use_bam=True)

    e5_d2 = L.UpSampling2D((8, 8), interpolation="bilinear")(e5)
    e5_d2 = conv_block(e5_d2, 16 * expansion, act=False, use_bam=True)

    d2 = L.Concatenate()([e1_d2, e2_d2, d3_d2, d4_d2, e5_d2])
    d2 = conv_block(d2, 16 * 5 * expansion, act=False, use_bam=True)

    """ Decoder 1 """
    e1_d1 = conv_block(e1, 16 * expansion, act=False, use_bam=True)

    d2_d1 = L.UpSampling2D((2, 2), interpolation="bilinear")(d2)
    d2_d1 = conv_block(d2_d1, 16 * expansion, act=False, use_bam=True)

    d3_d1 = L.UpSampling2D((4, 4), interpolation="bilinear")(d3)
    d3_d1 = conv_block(d3_d1, 16 * expansion, act=False, use_bam=True)

    d4_d1 = L.UpSampling2D((8, 8), interpolation="bilinear")(d4)
    d4_d1 = conv_block(d4_d1, 16 * expansion, act=False, use_bam=True)

    e5_d1 = L.UpSampling2D((16, 16), interpolation="bilinear")(e5)
    e5_d1 = conv_block(e5_d1, 16 * expansion, act=False, use_bam=True)

    d1 = L.Concatenate()([e1_d1, d2_d1, d3_d1, d4_d1, e5_d1])
    d1 = conv_block(d1, 16 * 5 * expansion, act=False, use_bam=True)
    d1 = L.UpSampling2D((2, 2), interpolation="bilinear")(d1)

    """ Output """
    y1 = L.Conv2D(12, kernel_size=1, padding="same")(d1)
    y1 = L.Activation("softmax")(y1)

    outputs = [y1]

    model = tf.keras.Model(inputs, outputs, name="Unet3PlusBAM")
    return model

In [None]:
input_shape = (height, width, 3)
num_classes = 12

# Instantiate the model
model = unet_three_plus(input_shape, num_classes)

In [None]:
model.summary()

In [None]:
opt = tf.keras.optimizers.Adam(lr)
model.compile(loss='sparse_categorical_crossentropy', optimizer=opt, metrics=["sparse_categorical_accuracy"])

In [None]:
callbacks = [
    ModelCheckpoint(model_file, verbose=1, save_best_only=True),
    ReduceLROnPlateau(monitor="val_loss", mode='auto', factor=0.1, patience=4),
    CSVLogger(log_file),
    EarlyStopping(monitor="val_loss", patience=10, restore_best_weights=True)
]

In [None]:
model = model.fit(
    train_dataset,
    validation_data=valid_dataset,
    epochs=epochs,
    callbacks=callbacks,
    verbose=1
)

In [None]:
custom_objects = {
    "custom_BAM": BAM,
}
model = tf.keras.models.load_model(model_file, custom_objects=custom_objects)

In [None]:
import matplotlib.pyplot as plt
import os
# Load the CSV log file
log_file = os.path.join(files_dir, "Log-Unet.csv")
log_data = pd.read_csv(log_file)

# Check available columns in the CSV
print(log_data.columns)

# Plot Training and Validation Loss
plt.figure(figsize=(6, 6))
plt.plot(log_data['loss'], label='Training Loss')
plt.plot(log_data['val_loss'], label='Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training and Validation Loss')
plt.legend()
# Save the Loss plot
loss_plot_file_path = os.path.join(files_dir, 'training_validation_loss.png')
plt.savefig(loss_plot_file_path) # Close the figure to free memory
plt.show()

# Plot Training and Validation Accuracy
plt.figure(figsize=(6, 6))
plt.plot(log_data['sparse_categorical_accuracy'], label='Training Accuracy')
plt.plot(log_data['val_sparse_categorical_accuracy'], label='Validation Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.title('Training and Validation Accuracy')
plt.legend()
# Save the Accuracy plot
accuracy_plot_file_path = os.path.join(files_dir, 'training_validation_accuracy.png')
plt.savefig(accuracy_plot_file_path)  # Close the figure to free memory
plt.show()

In [None]:
class_to_rgb = {v: k for k, v in color_map.items()}

class_colors = {k: tuple(v/255.0 for v in rgb) for k, rgb in class_to_rgb.items()}
colors = np.array([class_colors[i] for i in sorted(class_colors.keys())])
cmap = mcolors.ListedColormap(colors)
norm = mcolors.BoundaryNorm(boundaries=np.arange(len(class_colors)+1) - 0.5, ncolors=len(class_colors))

def map_class_to_rgb(class_mask):
    rgb_mask = np.zeros((class_mask.shape[0], class_mask.shape[1], 3), dtype=np.uint8)
    for class_index, rgb in class_to_rgb.items():
        rgb_mask[class_mask == class_index] = rgb
    return rgb_mask


plt.figure(figsize=(15, 10))  

batch = next(iter(test_dataset)) 
batch_x, batch_y = batch

num_images = batch_x.shape[0]

for i in range(num_images):

    image = batch_x[i].numpy()
    mask = batch_y[i].numpy()


    prediction = model.predict(np.expand_dims(image, axis=0))[0]  
    predicted_class_indices = np.argmax(prediction, axis=-1) 

    predicted_mask_rgb = map_class_to_rgb(predicted_class_indices)

    original_label_path = test_y[i] 
    original_label = cv2.imread(original_label_path, cv2.IMREAD_COLOR)
    original_label = cv2.cvtColor(original_label, cv2.COLOR_BGR2RGB)
    original_label = cv2.resize(original_label, (width, height)) / 255.0

    plt.subplot(num_images, 3, 3*i + 1)
    plt.imshow(image)
    plt.title(f"Input Image {i+1}")
    plt.axis("off")

    plt.subplot(num_images, 3, 3*i + 2)
    plt.imshow(original_label)
    plt.title(f"Original Label {i+1}")
    plt.axis("off")

    plt.subplot(num_images, 3, 3*i + 3)
    plt.imshow(predicted_mask_rgb)
    plt.title(f"Predicted Mask {i+1}")
    plt.axis("off")

plt.tight_layout()
plt.show()

In [None]:
import time

# colors = [
#     (0, 0, 0),           # Background
#     (192, 128, 128),     # Person
#     (0, 128, 0),         # Bike
#     (128, 128, 128),     # Car
#     (128, 0, 0),         # Drone
#     (0, 0, 128),         # Boat
#     (192, 0, 128),       # Animal
#     (192, 0, 0),         # Obstacle
#     (192, 128, 0),       # Construction
#     (0, 64, 0),          # Vegetation
#     (128, 128, 0),       # Road
#     (0, 128, 128),       # Sky
# ]

time_taken = []
for x in tqdm(test_x):
    
    seq_folder = x.split("/")[-3]
    image_name = x.split("/")[-1]
    
    x = cv2.imread(x, cv2.IMREAD_COLOR)
    x = cv2.resize(x, (width, height))
    x = x / 255.0
    x = np.expand_dims(x, axis=0)

    start_time = time.time()
    p = model.predict(x)[0] 
    total_time = time.time() - start_time
    time_taken.append(total_time)

    p_class_indices = np.argmax(p, axis=-1)  
    
    p_rgb = np.zeros((p_class_indices.shape[0], p_class_indices.shape[1], 3), dtype=np.uint8)
    
    for rgb, idx in color_map.items():
        p_rgb[p_class_indices == idx] = rgb 
    
    p_rgb = cv2.cvtColor(p_rgb, cv2.COLOR_RGB2BGR)

    save_path_with_name = os.path.join(save_path, f"{seq_folder}_{image_name}")
    cv2.imwrite(save_path_with_name, p_rgb)

In [None]:
!zip -r BAMunet3plusAERO.zip /kaggle/working
from IPython.display import FileLink
FileLink(r'BAMunet3plusAERO.zip')