<a href="https://colab.research.google.com/github/ayshaashra/learn-Coding/blob/main/mian_unet.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [10]:
import os
import numpy as np
from PIL import Image
import skimage

from skimage.util import view_as_windows
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, UpSampling2D, Concatenate
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping
import logging

In [11]:
# Configurations and hyperparameters
config = {
    'base_dir': 'D:/Working/data/LEVIR-CD1',
    'patch_size': 256,
    'batch_size': 32,
    'epochs': 100,
    'augment_data': True
}

In [12]:
# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# Efficient patch extraction function using skimage
def cut_into_patches(image_path, patch_size=256):
    image = Image.open(image_path).convert("L")
    image = np.array(image) / 255.0  # Normalize while converting to an array
    window_shape = (patch_size, patch_size)
    patches = view_as_windows(image, window_shape, step=patch_size)
    patches = patches.reshape(-1, patch_size, patch_size)
    return patches


In [13]:
# Data generator with optional augmentation
def data_generator(image_pairs, labels, batch_size, patch_size, augment=False):
    while True:
        np.random.shuffle(image_pairs)  # Shuffle at each epoch
        for start in range(0, len(image_pairs), batch_size):
            end = start + batch_size
            X_batch, Y_batch = [], []
            for (image_a_path, image_b_path), label_path in image_pairs[start:end]:
                try:
                    image_a_patches = cut_into_patches(image_a_path, patch_size)
                    image_b_patches = cut_into_patches(image_b_path, patch_size)
                    label_patches = cut_into_patches(label_path, patch_size)

                    if augment:
                        # Placeholder for actual augmentation logic
                        pass

                    for patch_a, patch_b, patch_label in zip(image_a_patches, image_b_patches, label_patches):
                        X_batch.append(np.stack([patch_a, patch_b], axis=-1))
                        Y_batch.append(to_categorical(patch_label, num_classes=2).reshape(patch_size, patch_size, 2))
                except Exception as e:
                    logging.error(f"Failed to process data: {e}")
                    continue

            yield np.array(X_batch), np.array(Y_batch)

In [14]:
# Load dataset
def load_dataset(base_dir, subset):
    dir_path = os.path.join(base_dir, subset)
    a_dir = os.path.join(dir_path, 'A')
    b_dir = os.path.join(dir_path, 'B')
    label_dir = os.path.join(dir_path, 'label')
    a_images = sorted([f for f in os.listdir(a_dir) if f.endswith('.PNG')])
    b_images = sorted([f for f in os.listdir(b_dir) if f.endswith('.PNG')])
    labels = sorted([f for f in os.listdir(label_dir) if f.endswith('.PNG')])
    image_pairs, label_paths = [], []
    for a, b, label in zip(a_images, b_images, labels):
        if a == b == label:
            image_pairs.append((os.path.join(a_dir, a), os.path.join(b_dir, b)))
            label_paths.append(os.path.join(label_dir, label))
    return image_pairs, label_paths

In [15]:
# Main function to run the model training
if __name__ == "__main__":
    train_pairs, train_labels = load_dataset(config['base_dir'], 'train')
    val_pairs, val_labels = load_dataset(config['base_dir'], 'val')

    train_generator = data_generator(train_pairs, train_labels, config['batch_size'], config['patch_size'], config['augment_data'])
    val_generator = data_generator(val_pairs, val_labels, config['batch_size'], config['patch_size'])

    model = build_unet((config['patch_size'], config['patch_size'], 2))
    checkpoint = ModelCheckpoint('model_best.h5', monitor='val_loss', save_best_only=True, verbose=1)
    early_stop = EarlyStopping(monitor='val_loss', patience=10, verbose=1)

    model.fit(train_generator,
              steps_per_epoch=len(train_pairs) // config['batch_size'],
              epochs=config['epochs'],
              validation_data=val_generator,
              validation_steps=len(val_pairs) // config['batch_size'],
              callbacks=[checkpoint, early_stop])

FileNotFoundError: [Errno 2] No such file or directory: 'D:/Working/data/LEVIR-CD1/train/A'