In [1]:
import os
import numpy as np
from tensorflow.keras.utils import Sequence
import cv2
import tensorflow as tf
from tensorflow.keras import layers, Model
import torch
from transformers import AutoImageProcessor, SegformerForSemanticSegmentation
from PIL import Image
from sklearn.model_selection import train_test_split

tf.config.set_visible_devices([], 'GPU') 

In [2]:
urban_processor = AutoImageProcessor.from_pretrained("ZeeeWP/segformer-b0-finetuned-segments-satellite-terrain")
urban_model = SegformerForSemanticSegmentation.from_pretrained("ZeeeWP/segformer-b0-finetuned-segments-satellite-terrain")

def ensure_dir(directory):
    if not os.path.exists(directory):
        os.makedirs(directory)

def apply_reversed_summer_colormap(ndvi_prediction):
    # Normalize NDVI values to the range [0, 255]
    normalized_ndvi = cv2.normalize(ndvi_prediction, None, 0, 255, cv2.NORM_MINMAX)
    
    # Reverse the normalized values (flip the color mapping)
    reversed_normalized_ndvi = 255 - normalized_ndvi  # Reversing the normalized NDVI values
    
    # Apply the summer colormap on the reversed normalized values
    colormap = cv2.applyColorMap(reversed_normalized_ndvi.astype(np.uint8), cv2.COLORMAP_SUMMER)
    
    return colormap

# Function to get urbanized areas using Segformer
def get_urban_mask(image_path):
    image = Image.open(image_path).convert("RGB")
    inputs = urban_processor(images=image, return_tensors="pt")
    with torch.no_grad():
        outputs = urban_model(**inputs)
    logits = outputs.logits
    logits = torch.nn.functional.interpolate(
        logits,
        size=image.size[::-1],  # Resize to original image size
        mode="bilinear",
        align_corners=False
    )
    segmentation = torch.argmax(logits, dim=1).squeeze().cpu().numpy()  # Convert to NumPy array
    return segmentation

# Function to slice images into smaller patches, skipping urbanized areas
def slice_images(image_pairs, output_dir, patch_size=64):
    normal_dir = os.path.join(output_dir, "normal")
    ndvi_dir = os.path.join(output_dir, "ndvi")
    ensure_dir(normal_dir)
    ensure_dir(ndvi_dir)

    patch_id = 0

    for true_color_path, ndvi_path in image_pairs:
        img = cv2.imread(true_color_path)
        ndvi = cv2.imread(ndvi_path, cv2.IMREAD_GRAYSCALE)

        if img is None or ndvi is None:
            print(f"Error loading images: {true_color_path}, {ndvi_path}. Skipping...")
            continue

        urban_mask = get_urban_mask(true_color_path)  # Get urban mask from Segformer
        urban_mask_resized = cv2.resize(urban_mask, (img.shape[1], img.shape[0]), interpolation=cv2.INTER_NEAREST)

        ndvi_colormap = apply_reversed_summer_colormap(ndvi)

        h, w, _ = img.shape
        ph, pw = patch_size, patch_size

        for i in range(0, h - ph + 1, ph):
            for j in range(0, w - pw + 1, pw):
                if urban_mask_resized[i:i + ph, j:j + pw].sum() > 0:  # Skip urban patches
                    continue

                true_color_patch = img[i:i + ph, j:j + pw]
                ndvi_patch = ndvi_colormap[i:i + ph, j:j + pw]

                true_color_patch_path = os.path.join(normal_dir, f"patch_{patch_id}.png")
                ndvi_patch_path = os.path.join(ndvi_dir, f"patch_{patch_id}.png")

                cv2.imwrite(true_color_patch_path, true_color_patch)
                cv2.imwrite(ndvi_patch_path, ndvi_patch)

                patch_id += 1

    print(f"Saved {patch_id} patches to {output_dir}")

# List of image pairs (true color, NDVI)
image_pairs = [
    ("../color/1_True_color.jpg", "../ndvi/1_NDVI.jpg"),
    ("../color/2_True_color.jpg", "../ndvi/2_NDVI.jpg"),
    ("../color/3_True_color.jpg", "../ndvi/3_NDVI.jpg"),
    ("../color/4_True_color.jpg", "../ndvi/4_NDVI.jpg"),
]

# Output directory
output_directory = "combined_patches/"

# Slice images while skipping urban areas
slice_images(image_pairs, output_directory, patch_size=64)

preprocessor_config.json:   0%|          | 0.00/372 [00:00<?, ?B/s]

To support symlinks on Windows, you either need to activate Developer Mode or to run Python as an administrator. In order to activate developer mode, see this article: https://docs.microsoft.com/en-us/windows/apps/get-started/enable-your-device-for-development


config.json:   0%|          | 0.00/1.11k [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/14.9M [00:00<?, ?B/s]

Saved 454 patches to combined_patches/


In [3]:
def build_unet(input_shape=(64, 64, 3)):
    inputs = tf.keras.Input(shape=input_shape)

    # Encoder
    c1 = layers.Conv2D(64, (3, 3), activation='relu', padding='same')(inputs)
    c1 = layers.Conv2D(64, (3, 3), activation='relu', padding='same')(c1)
    p1 = layers.MaxPooling2D((2, 2))(c1)

    c2 = layers.Conv2D(128, (3, 3), activation='relu', padding='same')(p1)
    c2 = layers.Conv2D(128, (3, 3), activation='relu', padding='same')(c2)
    p2 = layers.MaxPooling2D((2, 2))(c2)

    # Bottleneck
    c3 = layers.Conv2D(256, (3, 3), activation='relu', padding='same')(p2)
    c3 = layers.Conv2D(256, (3, 3), activation='relu', padding='same')(c3)

    # Decoder
    u1 = layers.Conv2DTranspose(128, (2, 2), strides=(2, 2), padding='same')(c3)
    u1 = layers.concatenate([u1, c2])
    c4 = layers.Conv2D(128, (3, 3), activation='relu', padding='same')(u1)
    c4 = layers.Conv2D(128, (3, 3), activation='relu', padding='same')(c4)

    u2 = layers.Conv2DTranspose(64, (2, 2), strides=(2, 2), padding='same')(c4)
    u2 = layers.concatenate([u2, c1])
    c5 = layers.Conv2D(64, (3, 3), activation='relu', padding='same')(u2)
    c5 = layers.Conv2D(64, (3, 3), activation='relu', padding='same')(c5)

    outputs = layers.Conv2D(1, (1, 1), activation='sigmoid')(c5)  # Single NDVI output channel

    model = Model(inputs, outputs)
    return model

In [None]:
class PatchGenerator(Sequence):
    def __init__(self, normal_paths, ndvi_paths, batch_size):
        self.normal_paths = sorted(normal_paths)
        self.ndvi_paths = sorted(ndvi_paths)
        self.batch_size = batch_size

    def __len__(self):
        return len(self.normal_paths) // self.batch_size

    def __getitem__(self, idx):
        batch_normal = self.normal_paths[idx * self.batch_size:(idx + 1) * self.batch_size]
        batch_ndvi = self.ndvi_paths[idx * self.batch_size:(idx + 1) * self.batch_size]

        X = []
        Y = []

        for normal_path, ndvi_path in zip(batch_normal, batch_ndvi):
            normal_img = cv2.imread(normal_path) / 255.0  # Normalize to [0, 1]
            ndvi_img = cv2.imread(ndvi_path)[:, :, 1] / 255.0  # Extract green channel, normalize

            X.append(normal_img)
            Y.append(np.expand_dims(ndvi_img, axis=-1))  # Add channel dimension

        return np.array(X), np.array(Y)


# Paths to the patch directories
normal_dir = "combined_patches/normal"
ndvi_dir = "combined_patches/ndvi"

# Get all image paths
normal_paths = [os.path.join(normal_dir, f) for f in os.listdir(normal_dir)]
ndvi_paths = [os.path.join(ndvi_dir, f) for f in os.listdir(ndvi_dir)]

# Split the data into training and testing (80% training, 20% testing)
train_normal_paths, test_normal_paths, train_ndvi_paths, test_ndvi_paths = train_test_split(
    normal_paths, ndvi_paths, test_size=0.3, random_state=42
)

# Create data generators for training and testing
batch_size = 16
train_gen = PatchGenerator(train_normal_paths, train_ndvi_paths, batch_size)
test_gen = PatchGenerator(test_normal_paths, test_ndvi_paths, batch_size)

# Build the model
model = build_unet(input_shape=(64, 64, 3))
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=4e-4),
              loss='mean_squared_error',  # For regression
              metrics=['mae'])  # Optional: Monitor mean absolute error

# Train the model
model.fit(train_gen, epochs=6, validation_data=test_gen)

  self._warn_if_super_not_called()


Epoch 1/6
[1m19/19[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m15s[0m 616ms/step - loss: 0.0452 - mae: 0.1790 - val_loss: 0.0478 - val_mae: 0.1860
Epoch 2/6
[1m 6/19[0m [32m━━━━━━[0m[37m━━━━━━━━━━━━━━[0m [1m5s[0m 418ms/step - loss: 0.0383 - mae: 0.1683

In [None]:
def predict_large_image(model, image_path, patch_size=64, stride=32, alpha=0.5):
    """
    Predict NDVI values for a large image using sliding windows,
    apply a reversed COLORMAP_SUMMER for visualization,
    and blend the prediction with the original image.
    """
    img = cv2.imread(image_path).astype(np.float32) / 255.0  # Normalize input image to float32
    h, w, _ = img.shape

    # Get the urban segmentation mask
    urban_mask = get_urban_mask(image_path)
    urban_mask_resized = cv2.resize(urban_mask, (w, h), interpolation=cv2.INTER_NEAREST)

    # Pad image to make it divisible by patch_size
    pad_h = (patch_size - h % patch_size) % patch_size
    pad_w = (patch_size - w % patch_size) % patch_size
    img_padded = np.pad(img, ((0, pad_h), (0, pad_w), (0, 0)), mode='reflect')

    output = np.zeros((h + pad_h, w + pad_w, 1), dtype=np.float32)  # Ensure output is float32

    # Sliding window over the image
    for i in range(0, img_padded.shape[0] - patch_size + 1, stride):
        for j in range(0, img_padded.shape[1] - patch_size + 1, stride):
            patch = img_padded[i:i + patch_size, j:j + patch_size]
            pred_patch = model.predict(np.expand_dims(patch, axis=0))[0]
            output[i:i + patch_size, j:j + patch_size] += pred_patch

    # Crop back to original size
    output = output[:h, :w]

    # Expand urban_mask_resized to have a third dimension
    urban_mask_resized_expanded = urban_mask_resized[..., np.newaxis]

    # Step 1: Apply the reversed colormap to the NDVI prediction
    pred_colormap = apply_reversed_summer_colormap(output).astype(np.float32) / 255.0  # Normalize colormap

    # Step 2: Blend the original image with the NDVI prediction
    blended_image = cv2.addWeighted(pred_colormap, alpha, img, 1 - alpha, 0)

    # Step 3: Combine the urban mask to finalize the result
    # Non-urban areas (mask=0) get the blended image, urban areas (mask=1) keep original image values
    final_image = blended_image * (1 - urban_mask_resized_expanded) + img * urban_mask_resized_expanded

    return final_image

# Predict NDVI on a large image
large_image_path = "../test/img.png"
ndvi_prediction = predict_large_image(model, large_image_path, alpha=0.4)

# Save the final image if needed
output_path = "../test/img_NDVI_PRED_OVERLAY.png"
final_image = (ndvi_prediction * 255).astype(np.uint8)  # Convert back to 8-bit image for saving
cv2.imwrite(output_path, final_image)