This notebook trains and tests the Sattelite Image with help of 5 Deep Learning Models

In [None]:
import os, random, numpy as np, matplotlib.pyplot as plt, tensorflow as tf
import rasterio, pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score, jaccard_score, accuracy_score
from tensorflow.keras import layers, Model
SEED = 42
np.random.seed(SEED)
tf.random.set_seed(SEED)
RAW_IMAGE = "../data/raw/20241110_053942_45_24f7_3B_AnalyticMS_SR_8b_clip.tif"
RAW_MASK  = "../data/raw/20241110_053942_45_24f7_3B_AnalyticMS_SR_8b_clip_Hybrid_mask.tif"
OUT_DIR   = "../experiments_final"
PRED_DIR  = os.path.join(OUT_DIR, "test_predictions")
os.makedirs(PRED_DIR, exist_ok=True)
def load_tif_image(path):
    with rasterio.open(path) as src:
        img = np.moveaxis(src.read(), 0, 2).astype(np.float32)
        img = img / (np.max(img) + 1e-8)
    return img
def load_mask(path):
    with rasterio.open(path) as src:
        mask = src.read(1).astype(np.uint8)
    return mask
img = load_tif_image(RAW_IMAGE)
mask = load_mask(RAW_MASK)
print(" Loaded:", img.shape, mask.shape)
def create_tiles(image, mask, tile_size=256):
    tiles_img, tiles_mask = [], []
    H, W, _ = image.shape
    for i in range(0, H, tile_size):
        for j in range(0, W, tile_size):
            sub_img = image[i:i+tile_size, j:j+tile_size]
            sub_mask = mask[i:i+tile_size, j:j+tile_size]
            if sub_img.shape[0] == tile_size and sub_img.shape[1] == tile_size:
                tiles_img.append(sub_img)
                tiles_mask.append(sub_mask[..., np.newaxis])
    return np.array(tiles_img), np.array(tiles_mask)
tiles_img, tiles_mask = create_tiles(img, mask)
print(" Total Tiles:", tiles_img.shape, tiles_mask.shape)
non_empty = np.sum(tiles_mask, axis=(1,2,3)) > 0
tiles_img, tiles_mask = tiles_img[non_empty], tiles_mask[non_empty]
print(" Non-empty Tiles:", tiles_img.shape)
trainX, tempX, trainY, tempY = train_test_split(tiles_img, tiles_mask, test_size=0.75, random_state=SEED)
valX, testX, valY, testY = train_test_split(tempX, tempY, test_size=2/3, random_state=SEED)
print(f" Split — Train: {len(trainX)}, Val: {len(valX)}, Test: {len(testX)}")
def focal_loss(alpha=0.25, gamma=2.0):
    def loss(y_true, y_pred):
        y_true = tf.cast(y_true, tf.float32)
        bce = tf.keras.losses.binary_crossentropy(y_true, y_pred)
        p_t = y_true * y_pred + (1 - y_true) * (1 - y_pred)
        return tf.reduce_mean(alpha * (1 - p_t)**gamma * bce)
    return loss
def conv_block(x, filters):
    x = layers.Conv2D(filters, 3, padding="same", activation="relu")(x)
    x = layers.BatchNormalization()(x)
    x = layers.Conv2D(filters, 3, padding="same", activation="relu")(x)
    x = layers.BatchNormalization()(x)
    return x
def residual_block(x, filters):
    skip = layers.Conv2D(filters, 1, padding="same")(x)
    x = layers.Conv2D(filters, 3, padding="same", activation="relu")(x)
    x = layers.BatchNormalization()(x)
    x = layers.Conv2D(filters, 3, padding="same")(x)
    x = layers.BatchNormalization()(x)
    x = layers.Add()([x, skip])
    return layers.Activation("relu")(x)
def attention_gate(x, g, filters):
    g1 = layers.Conv2D(filters, 1, padding="same")(g)
    x1 = layers.Conv2D(filters, 1, padding="same")(x)
    psi = layers.Activation("relu")(layers.Add()([g1, x1]))
    psi = layers.Conv2D(1, 1, padding="same", activation="sigmoid")(psi)
    return layers.Multiply()([x, psi])
def build_unet(shape):
    inputs = layers.Input(shape)
    s1 = conv_block(inputs, 64); p1 = layers.MaxPooling2D()(s1)
    s2 = conv_block(p1, 128); p2 = layers.MaxPooling2D()(s2)
    s3 = conv_block(p2, 256); p3 = layers.MaxPooling2D()(s3)
    s4 = conv_block(p3, 512); p4 = layers.MaxPooling2D()(s4)
    b = conv_block(p4, 1024)
    d1 = layers.Conv2DTranspose(512, 2, strides=2, padding="same")(b)
    d1 = layers.concatenate([d1, s4]); d1 = conv_block(d1, 512)
    d2 = layers.Conv2DTranspose(256, 2, strides=2, padding="same")(d1)
    d2 = layers.concatenate([d2, s3]); d2 = conv_block(d2, 256)
    d3 = layers.Conv2DTranspose(128, 2, strides=2, padding="same")(d2)
    d3 = layers.concatenate([d3, s2]); d3 = conv_block(d3, 128)
    d4 = layers.Conv2DTranspose(64, 2, strides=2, padding="same")(d3)
    d4 = layers.concatenate([d4, s1]); d4 = conv_block(d4, 64)
    outputs = layers.Conv2D(1, 1, activation="sigmoid")(d4)
    return Model(inputs, outputs, name="UNet")
def build_resunet(shape):
    inputs = layers.Input(shape)
    s1 = residual_block(inputs, 64); p1 = layers.MaxPooling2D()(s1)
    s2 = residual_block(p1, 128); p2 = layers.MaxPooling2D()(s2)
    s3 = residual_block(p2, 256); p3 = layers.MaxPooling2D()(s3)
    s4 = residual_block(p3, 512); p4 = layers.MaxPooling2D()(s4)
    b = residual_block(p4, 1024)
    d1 = layers.Conv2DTranspose(512, 2, strides=2, padding="same")(b)
    d1 = layers.concatenate([d1, s4]); d1 = residual_block(d1, 512)
    d2 = layers.Conv2DTranspose(256, 2, strides=2, padding="same")(d1)
    d2 = layers.concatenate([d2, s3]); d2 = residual_block(d2, 256)
    d3 = layers.Conv2DTranspose(128, 2, strides=2, padding="same")(d2)
    d3 = layers.concatenate([d3, s2]); d3 = residual_block(d3, 128)
    d4 = layers.Conv2DTranspose(64, 2, strides=2, padding="same")(d3)
    d4 = layers.concatenate([d4, s1]); d4 = residual_block(d4, 64)
    outputs = layers.Conv2D(1, 1, activation="sigmoid")(d4)
    return Model(inputs, outputs, name="ResUNet")
def build_attnunet(shape):
    inputs = layers.Input(shape)
    s1 = conv_block(inputs, 64); p1 = layers.MaxPooling2D()(s1)
    s2 = conv_block(p1, 128); p2 = layers.MaxPooling2D()(s2)
    s3 = conv_block(p2, 256); p3 = layers.MaxPooling2D()(s3)
    s4 = conv_block(p3, 512); p4 = layers.MaxPooling2D()(s4)
    b = conv_block(p4, 1024)
    g4 = layers.Conv2DTranspose(512, 2, strides=2, padding="same")(b)
    s4a = attention_gate(s4, g4, 512)
    d1 = layers.concatenate([g4, s4a]); d1 = conv_block(d1, 512)
    g3 = layers.Conv2DTranspose(256, 2, strides=2, padding="same")(d1)
    s3a = attention_gate(s3, g3, 256)
    d2 = layers.concatenate([g3, s3a]); d2 = conv_block(d2, 256)
    g2 = layers.Conv2DTranspose(128, 2, strides=2, padding="same")(d2)
    s2a = attention_gate(s2, g2, 128)
    d3 = layers.concatenate([g2, s2a]); d3 = conv_block(d3, 128)
    g1 = layers.Conv2DTranspose(64, 2, strides=2, padding="same")(d3)
    s1a = attention_gate(s1, g1, 64)
    d4 = layers.concatenate([g1, s1a]); d4 = conv_block(d4, 64)
    outputs = layers.Conv2D(1, 1, activation="sigmoid")(d4)
    return Model(inputs, outputs, name="AttnUNet")
def build_attnresunet(shape):
    inputs = layers.Input(shape)
    s1 = residual_block(inputs, 64); p1 = layers.MaxPooling2D()(s1)
    s2 = residual_block(p1, 128); p2 = layers.MaxPooling2D()(s2)
    s3 = residual_block(p2, 256); p3 = layers.MaxPooling2D()(s3)
    s4 = residual_block(p3, 512); p4 = layers.MaxPooling2D()(s4)
    b = residual_block(p4, 1024)
    g4 = layers.Conv2DTranspose(512, 2, strides=2, padding="same")(b)
    s4a = attention_gate(s4, g4, 512)
    d1 = layers.concatenate([g4, s4a]); d1 = residual_block(d1, 512)
    g3 = layers.Conv2DTranspose(256, 2, strides=2, padding="same")(d1)
    s3a = attention_gate(s3, g3, 256)
    d2 = layers.concatenate([g3, s3a]); d2 = residual_block(d2, 256)
    g2 = layers.Conv2DTranspose(128, 2, strides=2, padding="same")(d2)
    s2a = attention_gate(s2, g2, 128)
    d3 = layers.concatenate([g2, s2a]); d3 = residual_block(d3, 128)
    g1 = layers.Conv2DTranspose(64, 2, strides=2, padding="same")(d3)
    s1a = attention_gate(s1, g1, 64)
    d4 = layers.concatenate([g1, s1a]); d4 = residual_block(d4, 64)
    outputs = layers.Conv2D(1, 1, activation="sigmoid")(d4)
    return Model(inputs, outputs, name="AttnResUNet")
def build_asdms(shape):
    inputs = layers.Input(shape)
    s1 = conv_block(inputs, 64); p1 = layers.MaxPooling2D()(s1)
    s2 = conv_block(p1, 128); p2 = layers.MaxPooling2D()(s2)
    s3 = conv_block(p2, 256); p3 = layers.MaxPooling2D()(s3)
    s4 = conv_block(p3, 512); p4 = layers.MaxPooling2D()(s4)
    b = conv_block(p4, 1024)
    d1 = layers.Conv2DTranspose(512, 2, strides=2, padding="same")(b)
    d1 = layers.concatenate([d1, s4]); d1 = conv_block(d1, 512)
    d2 = layers.Conv2DTranspose(256, 2, strides=2, padding="same")(d1)
    d2 = layers.concatenate([d2, s3]); d2 = conv_block(d2, 256)
    d3 = layers.Conv2DTranspose(128, 2, strides=2, padding="same")(d2)
    d3 = layers.concatenate([d3, s2]); d3 = conv_block(d3, 128)
    d4 = layers.Conv2DTranspose(64, 2, strides=2, padding="same")(d3)
    d4 = layers.concatenate([d4, s1]); d4 = conv_block(d4, 64)
    out1 = layers.Conv2D(1, 1, activation="sigmoid")(d1)
    out2 = layers.Conv2D(1, 1, activation="sigmoid")(d2)
    out3 = layers.Conv2D(1, 1, activation="sigmoid")(d3)
    out4 = layers.Conv2D(1, 1, activation="sigmoid")(d4)
    outputs = layers.Average()([out1, out2, out3, out4])
    return Model(inputs, outputs, name="ASDMS")
def train_and_predict(name, builder, X_train, Y_train, X_val, Y_val, X_test, Y_test):
    model = builder((256,256,img.shape[-1]))
    model.compile(optimizer='adam', loss=focal_loss(), metrics=['accuracy'])
    print(f" Training {name}...")
    model.fit(X_train, Y_train, validation_data=(X_val, Y_val), epochs=60, batch_size=4, verbose=1)
    model.save(os.path.join(OUT_DIR, f"{name}.keras"))
    preds = (model.predict(X_test, verbose=0) > 0.5).astype(np.uint8)
    metrics = {
        "IoU": jaccard_score(Y_test.flatten(), preds.flatten(), zero_division=0),
        "F1": f1_score(Y_test.flatten(), preds.flatten(), zero_division=0),
        "Acc": accuracy_score(Y_test.flatten(), preds.flatten())
    }
    model_dir = os.path.join(PRED_DIR, name)
    os.makedirs(model_dir, exist_ok=True)
    for i, (x, y, p) in enumerate(zip(X_test, Y_test, preds)):
        fig, axs = plt.subplots(1, 3, figsize=(10, 4))
        rgb = x[..., :3]; rgb = (rgb - rgb.min()) / (rgb.max() - rgb.min() + 1e-8)
        axs[0].imshow(rgb); axs[0].set_title("Satellite"); axs[0].axis("off")
        axs[1].imshow(y.squeeze(), cmap="gray"); axs[1].set_title("Ground Truth"); axs[1].axis("off")
        axs[2].imshow(p.squeeze(), cmap="gray"); axs[2].set_title(f"{name} Prediction"); axs[2].axis("off")
        plt.tight_layout()
        plt.savefig(os.path.join(model_dir, f"tile_{i:04d}.png"), dpi=150)
        plt.close()
    return metrics
builders = {
    "unet": build_unet,
    "resunet": build_resunet,
    "attnunet": build_attnunet,
    "attnresunet": build_attnresunet,
    "asdms": build_asdms
}
results = {}
for name, builder in builders.items():
    metrics = train_and_predict(name, builder, trainX, trainY, valX, valY, testX, testY)
    results[name] = metrics
    print(f" {name}: IoU={metrics['IoU']:.4f}, F1={metrics['F1']:.4f}, Acc={metrics['Acc']:.4f}")
df = pd.DataFrame(results).T
csv_path = os.path.join(OUT_DIR, "results_summary.csv")
df.to_csv(csv_path)
print("\n Metrics saved to:", csv_path)
print(df)
print("\n All test predictions saved to:", PRED_DIR)
