In [None]:
import os
import cv2
import numpy as np

# Function to load and preprocess the images
def load_and_preprocess_images(directory, image_size=(256, 256),crop_size=256):
    images = []
    for filename in os.listdir(directory):
        if(len(images)==2500):
            break
        if filename.endswith(".jpg") or filename.endswith(".png"):
            image_path = os.path.join(directory, filename)
            image = cv2.imread(image_path)
            #image = cv2.resize(image, image_size)  # Resize the image to the desired size
            image = image.astype(np.float32) / 255.0  # Normalize pixel values to [0, 1]
            images.append(image)
    return np.array(images)

# Load and preprocess the train and test images
train_pre_images = load_and_preprocess_images("/kaggle/input/levir-cropped/Cropped/Train/A")
train_post_images = load_and_preprocess_images("/kaggle/input/levir-cropped/Cropped/Train/B")
train_change_maps = load_and_preprocess_images("/kaggle/input/levir-cropped/Cropped/Train/label")

# test_pre_images = load_and_preprocess_images("/kaggle/input/s2looking/LEVIR-CD+/LEVIR-CD+/test/A")
# test_post_images = load_and_preprocess_images("/kaggle/input/s2looking/LEVIR-CD+/LEVIR-CD+/test/B")
# test_change_maps = load_and_preprocess_images("/kaggle/input/s2looking/LEVIR-CD+/LEVIR-CD+/test/label")

# Make sure the number of images in train and test folders match
#assert len(train_pre_images) == len(train_post_images) == len(train_change_maps), "Number of train images should be the same in all folders."
# assert len(test_pre_images) == len(test_post_images) == len(test_change_maps), "Number of test images should be the same in all folders."

# Print the shape of the loaded images
print("Train Pre images shape:", train_pre_images.shape)
print("Train Post images shape:", train_post_images.shape)
print("Train Change maps shape:", train_change_maps.shape)

# print("Test Pre images shape:", test_pre_images.shape)
# print("Test Post images shape:", test_post_images.shape)
# print("Test Change maps shape:", test_change_maps.shape)


def preprocessing(train_change_maps):
    # Assuming you have a NumPy array 'label_images' containing your label images

    # Apply the threshold to set all values greater than 0 to 1
    thresholded_labels = np.where(train_change_maps > 0, 1, train_change_maps)

    # 'thresholded_labels' now contains label images with all values greater than 0 as class 1
    # Assuming you have a NumPy array 'binary_images' containing your binary images
    # Each image is a 2D array with 0s and 1s

    # Define the kernel for erosion and dilation (structuring element)
    kernel = np.ones((3, 3), np.uint8)  # You can adjust the size as needed

    # Create empty arrays to store preprocessed images
    eroded_images = []
    dilated_images = []

    for image in thresholded_labels:
        # Erosion
        eroded_image = cv2.erode(image, kernel, iterations=2)
        eroded_images.append(eroded_image)

        # Dilation
        dilated_image = cv2.dilate(eroded_image, kernel, iterations=2)
        dilated_images.append(dilated_image)

    # Now 'eroded_images' contains the images after erosion, and 'dilated_images' contains the images after dilation

        # Convert the list of dilated images to a NumPy array
    dilated_images_array = np.array(dilated_images)
        
    return dilated_images_array
    

In [None]:
import numpy as np

dilated_images_array = preprocessing(train_change_maps)


In [None]:
import matplotlib.pyplot as plt
plt.figure(figsize=(15, 9))
plt.subplot(4, 1, 1)
plt.imshow(train_pre_images[176, :, :, 0:3])
plt.figure(figsize=(15, 9))
plt.subplot(4, 1, 1)
plt.imshow(train_post_images[176, :, :, 0:3])
plt.figure(figsize=(15, 9))
plt.subplot(4, 1, 1)
plt.imshow(train_change_maps[176, :, :, 0],cmap='gray')
plt.title("Before Preprocessing")
plt.subplot(4, 1, 2)
plt.imshow(dilated_images_array[176, :, :, 0],cmap='gray')
plt.title("After Preprocessing")

In [None]:
# from tensorflow.keras.preprocessing.image import ImageDataGenerator

# # Create an ImageDataGenerator for data augmentation
# datagen = ImageDataGenerator(
#     rotation_range=50,
#     width_shift_range=0.1,
#     height_shift_range=0.1,
#     shear_range=0.2,
#     zoom_range=0.2,
#     horizontal_flip=True,
#     vertical_flip = True,
#     fill_mode='nearest'
# )

In [None]:
from sklearn.model_selection import train_test_split

# Split the data for train_pre, train_post, and labels
x_train_pre, x_valid_pre, x_train_post, x_valid_post, y_train, y_valid = train_test_split(train_pre_images, train_post_images, dilated_images_array, test_size=0.2, shuffle=True, random_state=42)

# You now have:
# - x_train_pre and y_train for training pre features and labels
# - x_valid_pre and y_valid for validation pre features and labels
# - x_train_post for training post features
# - x_valid_post for validation post features


In [None]:
print(x_train_pre.shape,y_train.shape,x_valid_pre.shape,y_valid.shape)

In [None]:
# del x_train_pre
# del x_train_post
# del y_train
# del x_valid_pre
# del x_valid_post
# del y_valid

In [None]:
del train_pre_images
del train_post_images
del train_change_maps
del dilated_images_array

In [None]:
from tensorflow.keras import backend as K

# recall
def recall_m(y_true, y_pred):
    true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
    possible_positives = K.sum(K.round(K.clip(y_true, 0, 1)))
    recall = true_positives / (possible_positives + K.epsilon())
    return recall

# precision
def precision_m(y_true, y_pred):
    true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
    predicted_positives = K.sum(K.round(K.clip(y_pred, 0, 1)))
    precision = true_positives / (predicted_positives + K.epsilon())
    return precision

#f1 score
def f1_m(y_true, y_pred):
    precision = precision_m(y_true, y_pred)
    recall = recall_m(y_true, y_pred)
    return 2*((precision*recall)/(precision+recall+K.epsilon()))

In [None]:
def channel_attention_module(x, ratio=8):
    batch, _, _, channel = x.shape

    ## Shared layers
    l1 = Dense(channel//ratio, activation="relu", use_bias=False)
    l2 = Dense(channel, use_bias=False)

    ## Global Average Pooling
    x1 = GlobalAveragePooling2D()(x)
    x1 = l1(x1)
    x1 = l2(x1)

    ## Global Max Pooling
    x2 = GlobalMaxPooling2D()(x)
    x2 = l1(x2)
    x2 = l2(x2)

    ## Add both the features and pass through sigmoid
    feats = x1 + x2
    feats = Activation("sigmoid")(feats)
    feats = Multiply()([x, feats])

    return feats

In [None]:
def spatial_attention_module(x):
    ## Average Pooling
    x1 = tf.reduce_mean(x, axis=-1)
    x1 = tf.expand_dims(x1, axis=-1)

    ## Max Pooling
    x2 = tf.reduce_max(x, axis=-1)
    x2 = tf.expand_dims(x2, axis=-1)

    ## Concatenat both the features
    feats = Concatenate()([x1, x2])
    ## Conv layer
    feats = Conv2D(1, kernel_size=7, padding="same", activation="sigmoid")(feats)
    feats = Multiply()([x, feats])

    return feats

In [None]:
def cbam(x):
    x = channel_attention_module(x)
    x = spatial_attention_module(x)
    return x

In [None]:
import tensorflow as tf
import tensorflow as tf
from tensorflow.keras import layers, models
from tensorflow.keras.layers import Input, Conv2D, BatchNormalization, ReLU, Add, MaxPooling2D, GlobalAveragePooling2D, Dense
from tensorflow.keras.models import Model
from tensorflow.keras import layers, regularizers
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, UpSampling2D, Concatenate, BatchNormalization, Activation, Dropout

def residual_block(x, filters, stride=1):
    shortcut = x
    x = Conv2D(filters, kernel_size=(3, 3), strides=stride, padding='same')(x)
    x = BatchNormalization()(x)
    x = ReLU()(x)
    x = Conv2D(filters, kernel_size=(3, 3), strides=1,padding='same')(x)
    x = BatchNormalization()(x)

    if stride != 1 or shortcut.shape[-1] != filters:
        shortcut = Conv2D(filters, kernel_size=(1, 1), strides=stride, padding='same')(shortcut)
        shortcut = BatchNormalization()(shortcut)

    x = Add()([x, shortcut])
    x = ReLU()(x)
    return x

def build_resnet18(input_shape):
    input_tensor = Input(shape=input_shape)
    
    x = Conv2D(64, kernel_size=(3, 3), strides=2, padding='same')(input_tensor)
    x = BatchNormalization()(x)
    x = ReLU()(x)
    x = MaxPooling2D(pool_size=(3, 3), strides=2, padding='same')(x)
    
    x = residual_block(x, 64)
    x = residual_block(x, 64)
    feature_map1 = x
    x = residual_block(x, 128, 2)
    x = residual_block(x, 128)
    feature_map2 = x
    x = residual_block(x, 256, 2)
    x = residual_block(x, 256)
    feature_map3 = x
    x = residual_block(x, 512, 2)
    x = residual_block(x, 512)
    feature_map4 = cbam(x) #Added Attention Module


    model = Model(inputs=input_tensor, outputs=[feature_map1, feature_map2, feature_map3, feature_map4], name='resnet18')

    return model


In [None]:
import tensorflow as tf
from tensorflow.keras.layers import Input, Conv2D, BatchNormalization, Activation, Add,UpSampling2D, GlobalMaxPooling2D

# Define a convolutional block with 3x3 Conv, BN, and ReLU
def conv_block(x, filters):
    x = Conv2D(filters, (3, 3), padding='same')(x)
    x = BatchNormalization()(x)
    x = Activation('relu')(x)
    return x

# Define a residual block with two consecutive convolutional layers
def residual_block_LFFM(x, filters):
    shortcut = x  # Store the shortcut connection

    # First convolutional layer
    x1 = conv_block(x, filters)

    # Second convolutional layer
    x2 = Conv2D(filters, (3, 3), padding='same')(x1)
    x2 = BatchNormalization()(x2)

    # Apply ReLU activation
    #x = Activation('relu')(x)
    
    return x2

# Create a Keras model
def LFFM(height, width, num_channels,concatenate):

    # Define constants
    num_channels = num_channels  # Number of channels (Ci)
    num_residual_blocks = 2  # Number of residual blocks in the first stage
    Fcat = concatenate

    Feature1 = residual_block_LFFM(Fcat, num_channels)
    Fcat_conv = Conv2D(num_channels, (3, 3), padding='same')(Fcat)

    # Apply 3x3 Conv and BN operations on Feature1 and Fcat
    # Element-wise summation operation to obtain Feature2
    Feature2 = Activation('relu')(tf.add(Feature1, Fcat_conv))

    # Change the number of channels to 96
    num_channels_2 = 96

    # Apply the first convolution layer to obtain Feature3
    Feature3 = residual_block_LFFM(Feature2, num_channels_2)
    
    Feature2_conv = Conv2D(num_channels_2, (3, 3), padding='same')(Feature2)
    
    #print(Feature3.shape,Feature2_conv.shape)

    # Apply the second convolution layer to obtain Feature4
    Feature4 = Activation('relu')(tf.add(Feature3,Feature2_conv))
    
    #print(Feature4.shape)
    
    return Feature4

# You have Feature4 ready for further processing


In [None]:
def MSFA(F1):
    global_avg_pooling_layer = tf.keras.layers.GlobalAveragePooling2D()
    F1_gap = global_avg_pooling_layer(F1)
    F1_gap = tf.expand_dims(tf.expand_dims(F1_gap, 1), 1)
    F1_conv = tf.keras.layers.Conv2D(filters=384, kernel_size=(1, 1))(F1_gap)
    #print(F1.shape)
    F1_conv_relu = tf.keras.layers.ReLU()(F1_conv)
    F2 = tf.keras.layers.Conv2D(filters=384, kernel_size=(1, 1))(F1_conv_relu)
    #print(F2.shape)
    concatenated_F1 = tf.concat([F1], axis=-1)
    F3 = F2 * concatenated_F1
    return F3

In [None]:
def classifier(input_shape,F3):
    c1 = tf.keras.layers.Conv2D(384, (3, 3), padding='same', input_shape=input_shape)(F3)
    c1 = tf.keras.layers.BatchNormalization()(c1)
    c1 = tf.keras.layers.ReLU()(c1)
    c1 = tf.keras.layers.Dropout(0.5)(c1)
    
    c2 = tf.keras.layers.Conv2D(384, (1, 1), padding='same')(c1)
    c2 = tf.keras.layers.BatchNormalization()(c2)
    c2 = tf.keras.layers.ReLU()(c2)
    
    up = tf.keras.layers.UpSampling2D(size=(2, 2))(c2)
    up2 = tf.keras.layers.UpSampling2D(size=(2, 2))(up)
    
    result = tf.keras.layers.Conv2D(3, (3, 3), activation='sigmoid', padding='same')(up2)
    return result

In [None]:
import tensorflow as tf
from tensorflow.keras.layers import GlobalAveragePooling2D, GlobalMaxPooling2D, Reshape, Dense, Input
from tensorflow.keras.layers import Activation, Concatenate, Conv2D, Multiply

In [None]:
def siamese_autoencoder(input_shape):
    pre_input = Input(shape=input_shape, name='pre_input')
    post_input = Input(shape=input_shape, name='post_input')

    encoder = build_resnet18(input_shape)

    pre_encoded = encoder(pre_input)
    post_encoded = encoder(post_input)
    diff_encoded = [tf.abs(tf.subtract(pre_enc, post_enc)) for pre_enc, post_enc in zip(pre_encoded, post_encoded)]
    
    f_concat_1 = [tf.concat([pre_enc, post_enc], axis=-1) for pre_enc, post_enc in zip(pre_encoded, post_encoded)]
    
    f_concat_final = [tf.concat([pre_enc, post_enc], axis=-1) for pre_enc, post_enc in zip(f_concat_1, diff_encoded)]
    
    #print(len(f_concat_final))
    LFFM_maps = []
    for i in f_concat_final:
        f = LFFM(i.shape[1], i.shape[2], i.shape[2],i)
        #print(f.shape)
        LFFM_maps.append(f)
    #print(len(LFFM_maps))
    
    up1 = UpSampling2D(size=(2, 2))(LFFM_maps[-1])
    up1 = cbam(up1) #Added Attention Layer
    concat1 = tf.concat([LFFM_maps[-2],up1],axis=-1)
    
    up2 = UpSampling2D(size=(2, 2))(concat1)
    up2 = cbam(up2) #Added Attention Layer
    concat2 = tf.concat([LFFM_maps[-3],up2],axis=-1)
    
    up3 = UpSampling2D(size=(2, 2))(concat2)
    final_concat = tf.concat([LFFM_maps[-4],up3],axis=-1)
    #print(final_concat.shape)
    
    F3 = MSFA(final_concat)
    print(F3.shape)
    result = classifier((32,32,384),F3)
    print(result.shape)
    
    return Model(inputs=[pre_input, post_input], outputs=result)
        

# Example usage:
input_shape = (256, 256, 3)
model = siamese_autoencoder(input_shape)
model.summary()

In [None]:
import tensorflow as tf
reduce_lr = tf.keras.callbacks.ReduceLROnPlateau(
    monitor = 'val_accuracy',
    patience = 50,
    verbose = 1
)
early_stop = tf.keras.callbacks.EarlyStopping(
    monitor = 'val_accuracy',
    patience = 50,
    verbose = 1,
    restore_best_weights = True
)
chkp = tf.keras.callbacks.ModelCheckpoint(
    'EDM_checkpoint.h5',
    monitor='val_accuracy',
    verbose=1,
    save_best_only=True
)

In [None]:
import tensorflow as tf
from tensorflow.keras import layers, Model
import numpy as np

from sklearn.utils import shuffle


# def dice_loss(y_true, y_pred):
#   y_true = tf.cast(y_true, tf.float32)
#   y_pred = tf.math.sigmoid(y_pred)
#   numerator = 2 * tf.reduce_sum(y_true * y_pred)
#   denominator = tf.reduce_sum(y_true + y_pred)
#   return 1 - numerator / denominator


import tensorflow as tf

def batch_balanced_contrastive_loss(y_true, y_pred, margin=2.2,lambda_=0.7):
    # Extract D* and M* from y_pred
    D_star = y_pred
    M_star = y_true
    
    nu = tf.reduce_sum(1 - M_star)
    nc = tf.reduce_sum(M_star)

    # Calculate the contrastive loss for positive pairs (first term)   #Handles false postives
    contrastive_loss = lambda_ * (1 / nu + 1e-8) * tf.reduce_sum(
        (1 - M_star) *(D_star)
    )

    # Calculate the margin loss for negative pairs (second term)   #Handles false negatives
    negative_loss = (1 - lambda_) * (1 / nc + 1e-8) * tf.reduce_sum(
        M_star * tf.maximum(0.0, margin - D_star)
    )

    # Total loss is the sum of the contrastive and negative losses
    total_loss = contrastive_loss + negative_loss

    return total_loss


In [None]:
def lr_schedule(epoch):
    if epoch < 100:
        return 0.001
    else:
        return 0.001 - (0.001 / 100) * (epoch - 100)


opt = tf.keras.optimizers.Adam(learning_rate=0.001, beta_1=0.5, beta_2=0.999)

model.compile(optimizer=opt, loss=batch_balanced_contrastive_loss, metrics=['accuracy', f1_m, precision_m, recall_m])

lr_scheduler = tf.keras.callbacks.LearningRateScheduler(lr_schedule)

history = model.fit([x_train_pre, x_train_post],
          y_train,
          batch_size=16,  
          epochs=180,
          validation_data=([x_valid_pre, x_valid_post],y_valid),
          validation_batch_size = 16,
          callbacks=[chkp,lr_scheduler])

In [None]:
model.save('final_model.h5')

Load the model

In [None]:
model = tf.keras.models.load_model('/kaggle/input/modelwithweight/final_model.h5')

In [None]:
import matplotlib.pyplot as plt
fig,((ax11,ax12),(ax13,ax14)) = plt.subplots(2,2,figsize=(20,15))
ax11.plot(history.history['loss'])
ax11.plot(history.history['val_loss'])
ax11.title.set_text('model loss')
ax11.set_ylabel('loss')
ax11.set_xlabel('epoch')
ax11.legend(['train', 'validation'], loc='upper left')

ax12.plot(history.history['precision_m'])
ax12.plot(history.history['val_precision_m'])
ax12.set_title('model precision')
ax12.set_ylabel('precision')
ax12.set_xlabel('epoch')
ax12.legend(['train', 'validation'], loc='upper left')

ax13.plot(history.history['recall_m'])
ax13.plot(history.history['val_recall_m'])
ax13.set_title('model recall')
ax13.set_ylabel('recall')
ax13.set_xlabel('epoch')
ax13.legend(['train', 'validation'], loc='upper left')

ax14.plot(history.history['f1_m'])
ax14.plot(history.history['val_f1_m'])
ax14.set_title('model f1')
ax14.set_ylabel('f1')
ax14.set_xlabel('epoch')
ax14.legend(['train', 'validation'], loc='upper left')

In [None]:
plt.figure(figsize=[6,4])
plt.plot(history.history['accuracy'], 'black', linewidth=2.0)
plt.plot(history.history['val_accuracy'], 'blue', linewidth=2.0)
plt.legend(['Training Accuracy', 'Validation Accuracy'], fontsize=14)
plt.xlabel('Epochs', fontsize=10)
plt.ylabel('Accuracy', fontsize=10)
plt.title('Accuracy Curves', fontsize=12)

In [None]:
loss, accuracy, f1_score, precision, recall = model.evaluate([x_valid_pre,x_valid_post],y_valid,batch_size=16, verbose=0)
print(loss, accuracy, f1_score, precision, recall)

In [None]:
threshold = 0.25
pred_img = model.predict([x_valid_pre,x_valid_post],batch_size=16)
#print(pred_img)
#pred_img = (pred_img > threshold).astype(np.uint8)
#print(pred_img[0])

In [None]:
import matplotlib.pyplot as plt
sample_index = 7
# sample_pre_image = test_pre_images[sample_index]
# sample_post_image = test_post_images[sample_index]

# # Expand dimensions to match the model's input shape
# sample_pre_image = np.expand_dims(sample_pre_image, axis=0)
# sample_post_image = np.expand_dims(sample_post_image, axis=0)

# # Use the trained model to predict the change map
# predicted_change_map = model.predict([sample_pre_image, sample_post_image])

#Plot the original pre and post images
plt.figure(figsize=(15, 9))
plt.subplot(4, 1, 1)
plt.imshow(x_valid_pre[sample_index, :, :, 0:3])
plt.title("Pre Image")
plt.subplot(4, 1, 2)
plt.imshow(x_valid_post[sample_index, :, :, 0:3])
plt.title("Post Image")

plt.subplot(4, 1, 3)
plt.imshow(y_valid[sample_index, :, :, 0], cmap='gray')
plt.title("Actual Change Map")

plt.subplot(4, 1, 4)
plt.imshow(pred_img[sample_index, :, :, 0], cmap='gray')
plt.title("Predicted Change Map")

plt.tight_layout()
plt.show()

# Calculate the total area (assuming each pixel represents a unit area)
total_area = pred_img[sample_index, :, :, 0].size

# Count the changed pixels (pixels with value 1)
changed_pixels = np.count_nonzero(pred_img[sample_index, :, :, 0] == 1)

# Estimate the percentage of damage
percentage_of_damage = (changed_pixels / total_area) * 100

print(f"Percentage of Damage: {percentage_of_damage:.2f}%")

Testing

In [None]:
# test_pre_images = load_and_preprocess_images("/kaggle/input/s2looking/LEVIR-CD+/LEVIR-CD+/test/A")
# test_post_images = load_and_preprocess_images("/kaggle/input/s2looking/LEVIR-CD+/LEVIR-CD+/test/B")
# test_change_maps = load_and_preprocess_images("/kaggle/input/s2looking/LEVIR-CD+/LEVIR-CD+/test/label")

In [None]:
# print("Test Pre images shape:", test_pre_images.shape)
# print("Test Post images shape:", test_post_images.shape)
# print("Test Change maps shape:", test_change_maps.shape)

In [None]:
# loss, accuracy, f1_score, precision, recall = model.evaluate([test_pre_images,test_post_images],test_change_maps, verbose=0)
# print(loss, accuracy, f1_score, precision, recall)

In [None]:
# pred_img_test = model.predict([test_pre_images,test_post_images],batch_size=16)

In [None]:
# import matplotlib.pyplot as plt
# sample_index = 16
# # sample_pre_image = test_pre_images[sample_index]
# # sample_post_image = test_post_images[sample_index]

# # # Expand dimensions to match the model's input shape
# # sample_pre_image = np.expand_dims(sample_pre_image, axis=0)
# # sample_post_image = np.expand_dims(sample_post_image, axis=0)

# # # Use the trained model to predict the change map
# # predicted_change_map = model.predict([sample_pre_image, sample_post_image])

# #Plot the original pre and post images
# plt.figure(figsize=(15, 9))
# plt.subplot(4, 1, 1)
# plt.imshow(test_pre_images[sample_index, :, :, 0:3])
# plt.title("Pre Image")
# plt.subplot(4, 1, 2)
# plt.imshow(test_post_images[sample_index, :, :, 0:3])
# plt.title("Post Image")

# plt.subplot(4, 1, 3)
# plt.imshow(test_change_maps[sample_index, :, :, 0], cmap='gray')
# plt.title("Actual Change Map")

# plt.subplot(4, 1, 4)
# plt.imshow(pred_img_test[sample_index, :, :, 0], cmap='gray')
# plt.title("Predicted Change Map")

# plt.tight_layout()
# plt.show()

# # Calculate the total area (assuming each pixel represents a unit area)
# total_area = pred_img_test[sample_index, :, :, 0].size

# # Count the changed pixels (pixels with value 1)
# changed_pixels = np.count_nonzero(pred_img_test[sample_index, :, :, 0] == 1)

# # Estimate the percentage of damage
# percentage_of_damage = (changed_pixels / total_area) * 100

# print(f"Percentage of Damage: {percentage_of_damage:.2f}%")