In [None]:
import os
import numpy as np
import tensorflow as tf
from tensorflow.keras.preprocessing.image import load_img, img_to_array
from sklearn.model_selection import train_test_split
import json
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, concatenate, Conv2DTranspose
import tensorflow as tf
from tensorflow.keras.layers import Input
from tensorflow.keras.models import Model
from tensorflow.keras.applications import ResNet50
from tensorflow.keras.layers import Conv2D, BatchNormalization, Activation

import tensorflow as tf
from tensorflow.keras.layers import Input, Concatenate
from tensorflow.keras.models import Model
from tensorflow.keras.applications import ResNet50
from tensorflow.keras.layers import Conv2D, BatchNormalization, Activation, Dropout
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.regularizers import l2

from matplotlib import pyplot as plt
import cv2


In [None]:
# Load image and mask paths
def load_image_paths(base_dir, scene_id):
    #before_image1 = os.path.join(base_dir, f'{scene_id}_change-0.png')
    before_image1 = base_dir + '/' + scene_id + '_change-0.png'
    #after_image1 = os.path.join(base_dir, f'{scene_id}_change-1.png')
    after_image1 = base_dir + '/' + scene_id + '_change-1.png'
    #mask = os.path.join(base_dir, f'{scene_id}_mask.png')
    mask = base_dir + '/' + scene_id + '_mask.png'
    return before_image1, after_image1, mask

def get_image_pairs_and_masks(base_dir, scene_ids):
    image_pairs = []
    masks = []
    
    
    for scene_id in scene_ids:
        before_image1, after_image1, mask = load_image_paths(base_dir, scene_id)
        image_pairs.append((before_image1, after_image1))
        masks.append(mask)
        
    return image_pairs, masks

In [None]:
# Data generator
class ChangeDetectionDataset(tf.keras.utils.Sequence):
    def __init__(self, image_pairs = None, masks = None, batch_size=4, image_size=(256, 256), shuffle=True):
        self.json_file = open('utils/synthetic_anno.json')
        self.coco = json.load(self.json_file) 
        self.process_images()
        if(image_pairs is None and masks is None):
            self.image_pairs, self.masks = self.get_image_pairs_and_masks('data/renders_multicam_diff_1')
        else:
            self.image_pairs = image_pairs
            self.masks = masks
        self.batch_size = batch_size
        self.image_size = image_size
        self.shuffle = shuffle
        self.indices = np.arange(len(self.image_pairs))
        #self.on_epoch_end()
        
    def get_image_pairs_and_masks(self, base_dir):
        image_pairs = []
        masks = []
        
        scene_ids = [item['scene'] for item in self.coco['images']]
        
        for scene_id in scene_ids:
            before_image1, after_image1, mask = load_image_paths(base_dir, scene_id)
            image_pairs.append((before_image1, after_image1))
            masks.append(mask)
            
            
        return image_pairs, masks
    
    def __len__(self):
        return int(np.ceil(len(self.image_pairs) / self.batch_size))
    
    def __getitem__(self, index):
        indices = self.indices[index * self.batch_size:(index + 1) * self.batch_size]
        batch_image_pairs = [self.image_pairs[i] for i in indices]
        batch_masks = [self.masks[i] for i in indices]
        
        #X1, X2, y = self.__data_generation(batch_image_pairs, batch_masks)
        X, y = self.__data_generation(batch_image_pairs, batch_masks)
        #return [X1, X2], y
        return X, y
    
    def on_epoch_end(self):
        self.indices = np.arange(len(self.image_pairs))
        if self.shuffle:
            np.random.shuffle(self.indices)
    
    # def __data_generation(self, batch_image_pairs, batch_masks):
    #     X1 = np.zeros((self.batch_size, *self.image_size, 3), dtype=np.float32)
    #     X2 = np.zeros((self.batch_size, *self.image_size, 3), dtype=np.float32)
    #     y = np.zeros((self.batch_size, *self.image_size, 3), dtype=np.float32)  # 3 channels for color-coded mask
        
    #     for i, (img_paths, mask_path) in enumerate(zip(batch_image_pairs, batch_masks)):
    #         before_img = img_to_array(load_img(img_paths[0], target_size=self.image_size)) / 255.0
    #         after_img = img_to_array(load_img(img_paths[1], target_size=self.image_size)) / 255.0
    #         mask = img_to_array(load_img(mask_path, target_size=self.image_size)) / 255.0
            
    #         X1[i, :, :, :] = before_img
    #         X2[i, :, :, :] = after_img
    #         y[i, :, :, :] = mask
        
    #     return X1, X2, y
    
    def __data_generation(self, batch_image_pairs, batch_masks):
        X = np.zeros((self.batch_size, *self.image_size, 6), dtype=np.float32)  # 6 channels for concatenated images
        y = np.zeros((self.batch_size, *self.image_size, 4), dtype=np.float32)  # 3 channels for color-coded mask
        
        for i, (img_paths, mask_path) in enumerate(zip(batch_image_pairs, batch_masks)):
            before_img = img_to_array(load_img(img_paths[0], target_size=self.image_size)) / 255.0
            after_img = img_to_array(load_img(img_paths[1], target_size=self.image_size)) / 255.0
            mask = img_to_array(load_img(mask_path, target_size=self.image_size)) / 255.0

            mask = self.rgb_to_onehot(mask)
            
            X[i, :, :, :3] = before_img
            X[i, :, :, 3:] = after_img
            y[i, :, :, :] = mask
        
        return X, y
    
    def process_images(self):
        self.images = {}
        for image in self.coco['images']:
            image_id = image['id']
            if image_id in self.images:
                print("ERROR: Skipping duplicate image id: {}".format(image))
            else:
                self.images[image_id] = image

    def rgb_to_onehot(rgb_image):
        print(f"rgb_image shape: {rgb_image.shape}")
        onehot_image = np.zeros((rgb_image.shape[0], rgb_image.shape[1], 4), dtype=np.float32)
        onehot_image[(rgb_image == [0, 0, 0]).all(axis=-1)] = [1, 0, 0, 0]     # Background
        onehot_image[(rgb_image == [255, 0, 0]).all(axis=-1)] = [0, 1, 0, 0]   # Red (Taken)
        onehot_image[(rgb_image == [0, 255, 0]).all(axis=-1)] = [0, 0, 1, 0]   # Green (Added)
        onehot_image[(rgb_image == [0, 0, 255]).all(axis=-1)] = [0, 0, 0, 1]   # Blue (Shifted)
        print(f"onehot_image shape: {onehot_image.shape}")
        return onehot_image

In [None]:
dataset = ChangeDetectionDataset()

print(dataset.images)

In [None]:
# Example usage with provided scene_ids
base_dir = 'data/renders_multicam_diff1'
images_arr = dataset.images

# Split dataset
image_pairs_train, image_pairs_test, masks_train, masks_test = train_test_split(
    dataset.image_pairs, dataset.masks, test_size=0.2, random_state=42
)

image_pairs_train, image_pairs_validation, masks_train, masks_validation = train_test_split(
    image_pairs_train, masks_train, test_size=0.2, random_state=42
)

# Create datasets
train_dataset = ChangeDetectionDataset(image_pairs_train, masks_train, batch_size=4)
validation_dataset = ChangeDetectionDataset(image_pairs_validation, masks_validation, batch_size=4)
test_dataset = ChangeDetectionDataset(image_pairs_test, masks_test, batch_size=4, shuffle=False)

In [None]:
print(train_dataset.image_pairs)
print(validation_dataset.image_pairs)
print(test_dataset.image_pairs)

In [None]:
print(len(train_dataset.image_pairs))
print(train_dataset.__len__())
#X, y = train_dataset.__getitem__(0)

# print("X shape:", X.shape)
# print("y shape:", y.shape)
# print(y)

In [None]:
# def siamese_unet(input_size=(256, 256, 3)):
#     inputs1 = Input(input_size)
#     inputs2 = Input(input_size)
    
#     def unet_encoder(inputs):
#         conv1 = Conv2D(64, 3, activation='relu', padding='same')(inputs)
#         conv1 = Conv2D(64, 3, activation='relu', padding='same')(conv1)
#         pool1 = MaxPooling2D(pool_size=(2, 2))(conv1)
        
#         conv2 = Conv2D(128, 3, activation='relu', padding='same')(pool1)
#         conv2 = Conv2D(128, 3, activation='relu', padding='same')(conv2)
#         pool2 = MaxPooling2D(pool_size=(2, 2))(conv2)
        
#         conv3 = Conv2D(256, 3, activation='relu', padding='same')(pool2)
#         conv3 = Conv2D(256, 3, activation='relu', padding='same')(conv3)
        
#         return conv1, conv2, conv3
    
#     enc1 = unet_encoder(inputs1)
#     enc2 = unet_encoder(inputs2)
    
#     def unet_decoder(enc1, enc2):
#         merge = concatenate([enc1[2], enc2[2]])
        
#         conv4 = Conv2D(512, 3, activation='relu', padding='same')(merge)
#         conv4 = Conv2D(512, 3, activation='relu', padding='same')(conv4)
        
#         up5 = concatenate([Conv2DTranspose(256, 2, strides=(2, 2), padding='same')(conv4), enc1[1], enc2[1]])
#         conv5 = Conv2D(256, 3, activation='relu', padding='same')(up5)
#         conv5 = Conv2D(256, 3, activation='relu', padding='same')(conv5)
        
#         up6 = concatenate([Conv2DTranspose(128, 2, strides=(2, 2), padding='same')(conv5), enc1[0], enc2[0]])
#         conv6 = Conv2D(128, 3, activation='relu', padding='same')(up6)
#         conv6 = Conv2D(128, 3, activation='relu', padding='same')(conv6)
        
#         conv7 = Conv2D(64, 3, activation='relu', padding='same')(conv6)
#         conv7 = Conv2D(64, 3, activation='relu', padding='same')(conv7)
        
#         outputs = Conv2D(3, 1, activation='softmax')(conv7)  # 3 channels for color-coded output
        
#         return outputs
    
#     outputs = unet_decoder(enc1, enc2)
#     model = tf.keras.models.Model(inputs=[inputs1, inputs2], outputs=outputs)
#     return model

# model = siamese_unet()

# initial_learning_rate = 1e-4 
# optimizer = Adam(learning_rate=initial_learning_rate)
# model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
# model.summary()

    
def unet_model(input_size=(256, 256, 6)):  # 6 channels for concatenated images
    inputs = Input(input_size)
    
    def unet_encoder(inputs):
        conv1 = Conv2D(16, 6, activation='relu', padding='same', kernel_regularizer=l2(0.001))(inputs)
        conv1 = BatchNormalization()(conv1)
        conv1 = Conv2D(16, 6, activation='relu', padding='same', kernel_regularizer=l2(0.001))(conv1)
        conv1 = BatchNormalization()(conv1)
        pool1 = MaxPooling2D(pool_size=(2, 2))(conv1)
        pool1 = Dropout(0.2)(pool1)
        
        conv2 = Conv2D(32, 3, activation='relu', padding='same', kernel_regularizer=l2(0.001))(pool1)
        conv2 = BatchNormalization()(conv2)
        conv2 = Conv2D(32, 3, activation='relu', padding='same', kernel_regularizer=l2(0.001))(conv2)
        conv2 = BatchNormalization()(conv2)
        pool2 = MaxPooling2D(pool_size=(2, 2))(conv2)
        pool2 = Dropout(0.2)(pool2)
        
        conv3 = Conv2D(64, 3, activation='relu', padding='same', kernel_regularizer=l2(0.001))(pool2)
        conv3 = BatchNormalization()(conv3)
        conv3 = Conv2D(64, 3, activation='relu', padding='same', kernel_regularizer=l2(0.001))(conv3)
        conv3 = BatchNormalization()(conv3)
        
        return conv1, conv2, conv3
    
    enc = unet_encoder(inputs)
    
    def unet_decoder(enc):
        conv1, conv2, conv3 = enc
        
        conv4 = Conv2D(128, 3, activation='relu', padding='same', kernel_regularizer=l2(0.001))(conv3)
        conv4 = BatchNormalization()(conv4)
        conv4 = Conv2D(128, 3, activation='relu', padding='same', kernel_regularizer=l2(0.001))(conv4)
        conv4 = BatchNormalization()(conv4)
        
        up5 = concatenate([Conv2DTranspose(64, 2, strides=(2, 2), padding='same')(conv4), conv2])
        conv5 = Conv2D(64, 3, activation='relu', padding='same', kernel_regularizer=l2(0.001))(up5)
        conv5 = BatchNormalization()(conv5)
        conv5 = Conv2D(64, 3, activation='relu', padding='same', kernel_regularizer=l2(0.001))(conv5)
        conv5 = BatchNormalization()(conv5)
        
        up6 = concatenate([Conv2DTranspose(32, 2, strides=(2, 2), padding='same')(conv5), conv1])
        conv6 = Conv2D(32, 3, activation='relu', padding='same', kernel_regularizer=l2(0.001))(up6)
        conv6 = BatchNormalization()(conv6)
        conv6 = Conv2D(32, 3, activation='relu', padding='same', kernel_regularizer=l2(0.001))(conv6)
        conv6 = BatchNormalization()(conv6)
        
        conv7 = Conv2D(16, 3, activation='relu', padding='same', kernel_regularizer=l2(0.001))(conv6)
        conv7 = BatchNormalization()(conv7)
        conv7 = Conv2D(16, 3, activation='relu', padding='same', kernel_regularizer=l2(0.001))(conv7)
        conv7 = BatchNormalization()(conv7)
        
        outputs = Conv2D(4, 1, activation='softmax')(conv7)  # 3 channels for color-coded output
        
        return outputs
    
    outputs = unet_decoder(enc)
    model = Model(inputs=inputs, outputs=outputs)
    return model

model = unet_model()
model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

In [None]:
print(train_dataset.__len__())

In [None]:
model.fit(train_dataset, validation_data=validation_dataset, epochs=10)

In [None]:
model.predict(test_dataset)

In [None]:
predictions = model.predict(test_dataset)
plt.imshow(predictions[2])

In [None]:
def onehot_to_rgb(onehot_mask):
    rgb_image = np.zeros((onehot_mask.shape[0], onehot_mask.shape[1], 3), dtype=np.float32)
    rgb_image[onehot_mask[:, :, 0] == 1] = [0, 0, 0]   # Background
    rgb_image[onehot_mask[:, :, 1] == 1] = [255, 0, 0] # Red (Taken)
    rgb_image[onehot_mask[:, :, 2] == 1] = [0, 255, 0] # Green (Added)
    rgb_image[onehot_mask[:, :, 3] == 1] = [0, 0, 255] # Blue (Shifted)
    return rgb_image

In [None]:

def visualize_predictions(model, dataset, batch_index=0):
    # Get a batch of data
    X, y_true = dataset[batch_index]
    
    summed_mask = np.sum(y_true, axis=3)
    cv2.imshow("Summed Mask", summed_mask)
    print('SHAPES: ', X.shape, y_true.shape)
    X1, X2 = np.split(X, 2, axis=-1)  # Split concatenated images back into two images
    y_pred = model.predict(X)

    # Convert predictions and ground truth to class labels
    y_true_labels = np.argmax(y_true, axis=-1, keepdims=True)
    print('Y_TRUE_LABELS: ', y_true_labels.shape)
    y_pred_labels = np.argmax(y_pred, axis=-1, keepdims=True)
    print('Y_PRED_LABELS: ', y_pred_labels.shape)

    y_true_rgb = np.array([onehot_to_rgb(np.eye(4)[y_true_labels[i].squeeze()]) for i in range(y_true_labels.shape[0])])
    print('Y_TRUE_RGB: ', y_true_rgb.shape)
    y_pred_rgb = np.array([onehot_to_rgb(np.eye(4)[y_pred_labels[i].squeeze()]) for i in range(y_pred_labels.shape[0])])

    #y_true_rgb = np.array([onehot_to_rgb(y) for y in y_true_labels])
    
    #y_pred_rgb = np.array([onehot_to_rgb(y) for y in y_pred_labels])

    # Function to plot images and masks
    def plot_comparison(before_img, after_img, true_mask, pred_mask, index=0):
        fig, axs = plt.subplots(1, 4, figsize=(20, 5))

        axs[0].imshow(before_img[index])
        axs[0].set_title('Before Image')
        axs[0].axis('off')

        axs[1].imshow(after_img[index])
        axs[1].set_title('After Image')
        axs[1].axis('off')

        axs[2].imshow(true_mask[index], cmap='jet', alpha=0.5)
        axs[2].set_title('Ground Truth Mask')
        axs[2].axis('off')

        axs[3].imshow(pred_mask[index], cmap='jet', alpha=0.5)
        axs[3].set_title('Predicted Mask')
        axs[3].axis('off')

        plt.show()

    # Plot the results for the first image in the batch
    plot_comparison(X1, X2, y_true_rgb, y_pred_rgb)



In [None]:
visualize_predictions(model, train_dataset)

: 

In [None]:
# def unet_model(input_size=(256, 256, 3)):
#     inputs1 = Input(input_size)
#     inputs2 = Input(input_size)
    
#     def unet_encoder(inputs):
#         conv1 = Conv2D(64, 3, activation='relu', padding='same')(inputs)
#         conv1 = Conv2D(64, 3, activation='relu', padding='same')(conv1)
#         pool1 = MaxPooling2D(pool_size=(2, 2))(conv1)
        
#         conv2 = Conv2D(128, 3, activation='relu', padding='same')(pool1)
#         conv2 = Conv2D(128, 3, activation='relu', padding='same')(conv2)
#         pool2 = MaxPooling2D(pool_size=(2, 2))(conv2)
        
#         conv3 = Conv2D(256, 3, activation='relu', padding='same')(pool2)
#         conv3 = Conv2D(256, 3, activation='relu', padding='same')(conv3)
        
#         return conv1, conv2, conv3
    
#     enc1 = unet_encoder(inputs1)
#     enc2 = unet_encoder(inputs2)
    
#     def unet_decoder(enc1, enc2):
#         merge = concatenate([enc1[2], enc2[2]])
        
#         conv4 = Conv2D(512, 3, activation='relu', padding='same')(merge)
#         conv4 = Conv2D(512, 3, activation='relu', padding='same')(conv4)
        
#         up5 = concatenate([Conv2DTranspose(256, 2, strides=(2, 2), padding='same')(conv4), enc1[1], enc2[1]])
#         conv5 = Conv2D(256, 3, activation='relu', padding='same')(up5)
#         conv5 = Conv2D(256, 3, activation='relu', padding='same')(conv5)
        
#         up6 = concatenate([Conv2DTranspose(128, 2, strides=(2, 2), padding='same')(conv5), enc1[0], enc2[0]])
#         conv6 = Conv2D(128, 3, activation='relu', padding='same')(up6)
#         conv6 = Conv2D(128, 3, activation='relu', padding='same')(conv6)
        
#         conv7 = Conv2D(64, 3, activation='relu', padding='same')(conv6)
#         conv7 = Conv2D(64, 3, activation='relu', padding='same')(conv7)
        
#         outputs = Conv2D(3, 1, activation='softmax')(conv7)  # 3 channels for color-coded output
        
#         return outputs
    
#     outputs = unet_decoder(enc1, enc2)
#     model = Model(inputs=[inputs1, inputs2], outputs=outputs)
#     return model

# model = unet_model()
# model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
# model.summary()

In [None]:
#model.fit(train_dataset, validation_data=validation_dataset, epochs=10)