In [1]:
from tensorflow.keras.utils import to_categorical
import numpy as np
import os
from keras.models import *
from keras.layers import *
from keras.optimizers import *
from keras.losses import *
from keras.callbacks import ModelCheckpoint, Callback
import PIL
import warnings
import cv2
import matplotlib.pyplot as plt
from keras.preprocessing.image import ImageDataGenerator
from keras import backend as K
import glob
import sklearn
import tifffile as tiff


In [2]:
import tensorflow as tf


In [3]:
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
    try:
        # Currently, memory growth needs to be the same across GPUs
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
        logical_gpus = tf.config.experimental.list_logical_devices('GPU')
        print(len(gpus), "Physical GPUs,", len(logical_gpus), "Logical GPUs")
    except RuntimeError as e:
        # Memory growth must be set before GPUs have been initialized
        print(e)


1 Physical GPUs, 1 Logical GPUs


In [4]:
import os
import numpy as np

DATASET_PATH_npy = "Levir-CD/npy_files/"

X1_train = np.load(os.path.join(DATASET_PATH_npy, 'X1_train.npy')).astype('float32') / 255.0
X2_train = np.load(os.path.join(DATASET_PATH_npy, 'X2_train.npy')).astype('float32') / 255.0
y_train = np.load(os.path.join(DATASET_PATH_npy, 'y_train.npy'))

X1_val = np.load(os.path.join(DATASET_PATH_npy, 'X1_val.npy')).astype('float32') / 255.0
X2_val = np.load(os.path.join(DATASET_PATH_npy, 'X2_val.npy')).astype('float32') / 255.0
y_val = np.load(os.path.join(DATASET_PATH_npy, 'y_val.npy'))

X1_test = np.load(os.path.join(DATASET_PATH_npy, 'X1_test.npy')).astype('float32') / 255.0
X2_test = np.load(os.path.join(DATASET_PATH_npy, 'X2_test.npy')).astype('float32') / 255.0
y_test = np.load(os.path.join(DATASET_PATH_npy, 'y_test.npy'))

DATASET_PATH = "Levir-CD/"

TRAIN_IMG1_DIR = os.path.join(DATASET_PATH, 'train/A/A')
TRAIN_IMG2_DIR = os.path.join(DATASET_PATH, 'train/B/B')
TRAIN_MASK_DIR = os.path.join(DATASET_PATH, 'train/L/label')

VAL_IMG1_DIR = os.path.join(DATASET_PATH, 'val/A/A')
VAL_IMG2_DIR = os.path.join(DATASET_PATH, 'val/B/B')
VAL_MASK_DIR = os.path.join(DATASET_PATH, 'val/L/label')

TEST_IMG1_DIR = os.path.join(DATASET_PATH, 'test/A/A')
TEST_IMG2_DIR = os.path.join(DATASET_PATH, 'test/B/B')
TEST_MASK_DIR = os.path.join(DATASET_PATH, 'test/L/label')


In [5]:
print(X1_train.shape)
print(X2_train.shape)
print(y_train.shape)
print(X1_val.shape)
print(X2_val.shape)
print(y_val.shape)
print(X1_test.shape)
print(X2_test.shape)
print(y_test.shape)


(445, 512, 512, 3)
(445, 512, 512, 3)
(445, 512, 512, 1)
(64, 512, 512, 3)
(64, 512, 512, 3)
(64, 512, 512, 1)
(128, 512, 512, 3)
(128, 512, 512, 3)
(128, 512, 512, 1)


In [6]:
color_dict = {0: (0),
              1: (255),
              }

def rgb_to_onehot(rgb_arr, color_dict):
    num_classes = len(color_dict)
    shape = rgb_arr.shape[:2]+(num_classes,)
    arr = np.zeros( shape, dtype=np.float32 )
    for i, cls in enumerate(color_dict):
        arr[:,:,i] = np.all(rgb_arr.reshape( (-1,1) ) == color_dict[i], axis=1).reshape(shape[:2])
    return arr

def onehot_to_rgb(onehot, color_dict):
    single_layer = np.argmax(onehot, axis=-1)
    output = np.zeros( onehot.shape[:2]+(3,) )
    for k in color_dict.keys():
        output[single_layer==k] = color_dict[k]
    return output

def get_dataset(X1_dir, X2_dir, y_dir):
    X1=[]
    X2 = []
    Y=[]
    for d in range(len(X1_dir)):
        X1.append(plt.imread(X1_dir[d]))
        X2.append(plt.imread(X2_dir[d]))
        onehot = rgb_to_onehot((plt.imread(y_dir[d]))*255,color_dict= color_dict)
        onehot=np.reshape(onehot,((1,)+onehot.shape))
        Y.append(onehot)
    X1 = np.reshape(np.array(X1),(len(X1_dir),512, 512,3))
    X2 = np.reshape(np.array(X2),(len(X1_dir),512, 512,3))
    Y = np.reshape(np.array(Y),(len(X1_dir),512, 512,2))
    return X1,X2,Y

In [21]:
from tensorflow.keras.layers import *
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras import Input
from tensorflow.keras import backend as K
import tensorflow as tf

def get_focal_loss(alpha=0.8, gamma=2.0):
    def focal_loss(y_true, y_pred):
        epsilon = K.epsilon()  # small value to avoid log(0)
        y_pred = K.clip(y_pred, epsilon, 1. - epsilon)

        bce = - (y_true * K.log(y_pred) + (1 - y_true) * K.log(1 - y_pred))
        pt = tf.where(K.equal(y_true, 1), y_pred, 1 - y_pred)
        loss = alpha * K.pow(1. - pt, gamma) * bce
        return K.mean(loss)
    return focal_loss


def att_module(x):
    s = K.int_shape(x)
    x1 = GlobalAveragePooling2D()(x)
    x1 = Dense(s[-1], activation='sigmoid', kernel_initializer='random_normal')(x1)
    x1 = Reshape((1, 1, s[-1]))(x1)
    x1 = Multiply()([x, x1])
    
    x2 = Lambda(lambda x: K.max(x, axis=-1, keepdims=True))(x)
    x2 = Conv2D(s[-1], 3, padding='same', activation='sigmoid', kernel_initializer='random_normal')(x2)
    x2 = Multiply()([x, x2])
    
    return Add()([x1, x2])

def conv_block(x, f):
    c = Conv2D(f, 3, activation='relu', padding='same', kernel_initializer='random_normal')(x)
    c = Conv2D(f, 3, activation='relu', padding='same', kernel_initializer='random_normal')(c)
    c = Conv2D(f, 3, activation='relu', padding='same', kernel_initializer='random_normal')(c)
    return c

def conc_att_conv(x1, x2, x3, f):
    up = Conv2D(f, 2, activation='relu', padding='same', kernel_initializer='random_normal')(UpSampling2D(size=(2, 2))(x2))
    merge = concatenate([x1, up, x3], axis=3)
    att = att_module(merge)
    x = conv_block(att, f)
    return x

def get_f1(y_true, y_pred):
    y_true = K.round(K.flatten(y_true))
    y_pred = K.round(K.flatten(y_pred))
    tp = K.sum(y_true * y_pred)
    pp = K.sum(y_pred)
    ap = K.sum(y_true)
    precision = tp / (pp + K.epsilon())
    recall = tp / (ap + K.epsilon())
    return 2 * (precision * recall) / (precision + recall + K.epsilon())

def adsnet(shape=(512, 512, 3), deep_supervision=True):
    input_layer_1 = Input(shape)
    input_layer_2 = Input(shape)

    conv1 = Conv2D(8, 3, activation='relu', padding='same', kernel_initializer='random_normal')
    c11 = conv1(input_layer_1)
    c21 = conv1(input_layer_2)
    pool11 = MaxPooling2D(pool_size=(2, 2))(c11)
    pool21 = MaxPooling2D(pool_size=(2, 2))(c21)

    conv2 = Conv2D(16, 3, activation='relu', padding='same', kernel_initializer='random_normal')
    c12 = conv2(pool11)
    c22 = conv2(pool21)
    pool12 = MaxPooling2D(pool_size=(2, 2))(c12)
    pool22 = MaxPooling2D(pool_size=(2, 2))(c22)

    conv3 = Conv2D(32, 3, activation='relu', padding='same', kernel_initializer='random_normal')
    c13 = conv3(pool12)
    c23 = conv3(pool22)
    conv3 = Conv2D(64, 3, activation='relu', padding='same', kernel_initializer='random_normal')
    c13 = conv3(c13)
    c23 = conv3(c23)
    pool13 = MaxPooling2D(pool_size=(2, 2))(c13)
    pool23 = MaxPooling2D(pool_size=(2, 2))(c23)

    conv4 = Conv2D(64, 3, activation='relu', padding='same', kernel_initializer='random_normal')
    c14 = conv4(pool13)
    c24 = conv4(pool23)
    conv4 = Conv2D(64, 3, activation='relu', padding='same', kernel_initializer='random_normal')
    c14 = conv4(c14)
    c24 = conv4(c24)
    pool14 = MaxPooling2D(pool_size=(2, 2))(c14)
    pool24 = MaxPooling2D(pool_size=(2, 2))(c24)

    conv5 = Conv2D(128, 3, activation='relu', padding='same', kernel_initializer='random_normal')
    c15 = conv5(pool14)
    c25 = conv5(pool24)
    conv5 = Conv2D(128, 3, activation='relu', padding='same', kernel_initializer='random_normal')
    c15 = conv5(c15)
    c25 = conv5(c25)

    c5 = c15 - c25
    c4 = c14 - c24
    c3 = c13 - c23
    c2 = c12 - c22

    merge5 = concatenate([c15, c5, c25], axis=3)
    att51 = att_module(merge5)
    c51 = Conv2D(128, 3, activation='relu', padding='same', kernel_initializer='random_normal')(att51)
    c51 = Conv2D(128, 3, activation='relu', padding='same', kernel_initializer='random_normal')(c51)

    c52 = conc_att_conv(c14, c51, c24, 64)
    c53 = conc_att_conv(c13, c52, c23, 32)
    c54 = conc_att_conv(c12, c53, c22, 16)
    x3 = conc_att_conv(c11, c54, c21, 8)

    merge6 = concatenate([c14, c4, c24], axis=3)
    att61 = att_module(merge6)
    c61 = conv_block(att61, 64)
    c62 = conc_att_conv(c13, c61, c23, 32)
    c63 = conc_att_conv(c12, c62, c22, 16)
    x2 = conc_att_conv(c11, c63, c21, 8)

    merge7 = concatenate([c13, c3, c23], axis=3)
    att71 = att_module(merge7)
    c71 = conv_block(att71, 32)
    c72 = conc_att_conv(c12, c71, c22, 16)
    x4 = conc_att_conv(c11, c72, c21, 8)

    merge8 = concatenate([c12, c2, c22], axis=3)
    att81 = att_module(merge8)
    c81 = conv_block(att81, 16)
    x1 = conc_att_conv(c11, c81, c21, 8)

    x1 = Conv2D(1, (1, 1), activation='sigmoid', padding='same')(x1)
    x2 = Conv2D(1, (1, 1), activation='sigmoid', padding='same')(x2)
    x3 = Conv2D(1, (1, 1), activation='sigmoid', padding='same')(x3)
    x4 = Conv2D(1, (1, 1), activation='sigmoid', padding='same')(x4)

    if deep_supervision:
        model = Model([input_layer_1, input_layer_2], [x1, x2, x3, x4])
        model.compile(
            optimizer=Adam(learning_rate=0.0001),
            loss=[get_focal_loss()] * 4,
            metrics=[get_f1, 'accuracy']
        )
    else:
        model = Model([input_layer_1, input_layer_2], [x3])
        model.compile(
            optimizer=Adam(learning_rate=0.0001),
            loss=get_focal_loss(),
            metrics=['accuracy']
        )

    return model

ads = adsnet()
ads.summary()


Model: "model_3"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_7 (InputLayer)            [(None, 512, 512, 3) 0                                            
__________________________________________________________________________________________________
input_8 (InputLayer)            [(None, 512, 512, 3) 0                                            
__________________________________________________________________________________________________
conv2d_231 (Conv2D)             (None, 512, 512, 8)  224         input_7[0][0]                    
                                                                 input_8[0][0]                    
__________________________________________________________________________________________________
max_pooling2d_24 (MaxPooling2D) (None, 256, 256, 8)  0           conv2d_231[0][0]           

In [None]:
import tensorflow as tf
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping

model_checkpoint = ModelCheckpoint(
    filepath='Levir-CD/saved_models/adsnet3.h5',  # changed backslash to forward slash (Windows accepts both, but safer)
    monitor='val_loss',
    save_best_only=True
)

callback = EarlyStopping(
    monitor='val_loss',
    patience=20,
    restore_best_weights=True
)

history = ads.fit(
    [X1_train, X2_train],
    [y_train, y_train, y_train, y_train],  # Match 4 outputs
    validation_data=([X1_val, X2_val], [y_val, y_val, y_val, y_val]),
    epochs=50,
    batch_size=1,
    callbacks=[model_checkpoint, callback]
)


Epoch 1/50




Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50

In [None]:
from keras.models import Model
from keras.layers import Lambda, Add

def ads_extended(model_weights=None, focal_weights=[1, 1, 1, 1]):
    ads = adsnet()  
    ads.load_weights(model_weights)  

    x1 = Lambda(lambda x: x * focal_weights[0])(ads.outputs[0])
    x2 = Lambda(lambda x: x * focal_weights[1])(ads.outputs[1])
    x3 = Lambda(lambda x: x * focal_weights[2])(ads.outputs[2])
    x4 = Lambda(lambda x: x * focal_weights[3])(ads.outputs[3])

    extended_layer = Add()([x1, x2, x3, x4])

    return Model(inputs=ads.inputs, outputs=extended_layer)

x = ads_extended(
    model_weights='Levir-CD/saved_models/adsnet3.h5',
    focal_weights=[0.2466, 0.2513, 0.2516, 0.2504]
)

x.summary()


In [None]:
import os
import numpy as np
import cv2
import matplotlib.pyplot as plt
from sklearn.metrics import f1_score, cohen_kappa_score, accuracy_score, jaccard_score, precision_score, recall_score

# Replace with your actual model variable
# model.load_weights('your_model_path.h5')

gtappend = []
predappend = []

addA = "I:/cd/levircd/test/A/A/"
addB = "I:/cd/levircd/test/B/B/"
addlabel = "I:/cd/levircd/test/L/label/"

for eg in os.listdir(addA):
    # Read raw 1024x1024 images
    imageA = plt.imread(os.path.join(addA, eg)).astype('float32') / 255.0
    imageB = plt.imread(os.path.join(addB, eg)).astype('float32') / 255.0
    GT = plt.imread(os.path.join(addlabel, eg)).astype('float32') / 255.0

    # Resize to 512x512 for model
    imageA = cv2.resize(imageA, (512, 512), interpolation=cv2.INTER_NEAREST)
    imageB = cv2.resize(imageB, (512, 512), interpolation=cv2.INTER_NEAREST)
    GT = cv2.resize(GT, (512, 512), interpolation=cv2.INTER_NEAREST)

    # Expand dims
    a = np.expand_dims(imageA, axis=0)  # shape: (1, 512, 512, 3)
    b = np.expand_dims(imageB, axis=0)  # shape: (1, 512, 512, 3)
    c = np.expand_dims(GT, axis=(0, -1))  # shape: (1, 512, 512, 1)

    # Predict
    y = model.predict([a, b])  # shape: (1, 512, 512, 2)
    y = np.squeeze(y)  # shape: (512, 512, 2)
    result = np.argmax(y, axis=-1)  # shape: (512, 512)

    # Store results
    predappend.append(result)
    gtappend.append(np.squeeze(c))  # shape: (512, 512)

# Stack and evaluate
g = np.stack(gtappend, axis=0).astype('int')  # GT masks
p = np.stack(predappend, axis=0).astype('int')  # Predicted masks

# Flatten for metric computation
gt = g.ravel()
pd = p.ravel()

# Compute metrics
print(np.unique(gt), np.unique(pd))
print("F1 SCORE:", f1_score(gt, pd))
print("Kappa:", cohen_kappa_score(gt, pd))
print("Accuracy:", accuracy_score(gt, pd))
print("Jaccard Score:", jaccard_score(gt, pd))
print("Precision:", precision_score(gt, pd))
print("Recall:", recall_score(gt, pd))


In [None]:
# import os
# import numpy as np
# import matplotlib.pyplot as plt

# TEST_IMG1_DIR = "Levir-CD/test/A/A"
# TEST_IMG2_DIR = "Levir-CD/test/B/B"
# TEST_MASK_DIR = "Levir-CD/test/L/label"

# for idx, fname in enumerate(os.listdir(TEST_IMG1_DIR)):
#     pathA = os.path.join(TEST_IMG1_DIR, fname)
#     pathB = os.path.join(TEST_IMG2_DIR, fname)
#     pathL = os.path.join(TEST_MASK_DIR, fname)

#     imgA = plt.imread(pathA)
#     imgB = plt.imread(pathB)
#     label = plt.imread(pathL)

#     if len(label.shape) == 2:
#         label = np.expand_dims(label, axis=-1)

#     print(f"\nFile: {fname}")
#     print(f"Image A: shape={imgA.shape}, dtype={imgA.dtype}, min={imgA.min()}, max={imgA.max()}")
#     print(f"Image B: shape={imgB.shape}, dtype={imgB.dtype}, min={imgB.min()}, max={imgB.max()}")
#     print(f"Label  : shape={label.shape}, dtype={label.dtype}, min={label.min()}, max={label.max()}")

#     if idx >= 4:  # check first 5 samples
#         break



File: test_1.png
Image A: shape=(1024, 1024, 3), dtype=float32, min=0.0, max=1.0
Image B: shape=(1024, 1024, 3), dtype=float32, min=0.0, max=1.0
Label  : shape=(1024, 1024, 1), dtype=float32, min=0.0, max=1.0

File: test_10.png
Image A: shape=(1024, 1024, 3), dtype=float32, min=0.04313725605607033, max=0.9411764740943909
Image B: shape=(1024, 1024, 3), dtype=float32, min=0.0, max=1.0
Label  : shape=(1024, 1024, 1), dtype=float32, min=0.0, max=1.0

File: test_100.png
Image A: shape=(1024, 1024, 3), dtype=float32, min=0.0, max=1.0
Image B: shape=(1024, 1024, 3), dtype=float32, min=0.0, max=1.0
Label  : shape=(1024, 1024, 1), dtype=float32, min=0.0, max=1.0

File: test_101.png
Image A: shape=(1024, 1024, 3), dtype=float32, min=0.0, max=1.0
Image B: shape=(1024, 1024, 3), dtype=float32, min=0.0, max=1.0
Label  : shape=(1024, 1024, 1), dtype=float32, min=0.0, max=1.0

File: test_102.png
Image A: shape=(1024, 1024, 3), dtype=float32, min=0.0, max=1.0
Image B: shape=(1024, 1024, 3), dtype=fl

In [None]:
# print("X1_train shape:", X1_train.shape)
# print("y_train shape:", y_train.shape)


X1_train shape: (445, 512, 512, 3)
y_train shape: (445, 512, 512, 1)


In [None]:
# y_train = np.load(os.path.join(DATASET_PATH_npy, 'y_train.npy')).astype('float32')
# y_val = np.load(os.path.join(DATASET_PATH_npy, 'y_val.npy')).astype('float32')
# y_test = np.load(os.path.join(DATASET_PATH_npy, 'y_test.npy')).astype('float32')
