In [28]:
# =========================================================
# AI SpillGuard - Oil Spill Detection (U-Net Pipeline)
# =========================================================
import os
import numpy as np
import cv2
import matplotlib.pyplot as plt
from scipy.ndimage import uniform_filter
import tensorflow as tf
from tensorflow.keras import layers, models
from glob import glob
import warnings
warnings.filterwarnings("ignore")
os.environ["TF_ENABLE_ONEDNN_OPTS"] = "0"

In [15]:
# -------------------------------
# 1. Dataset Configuration
# -------------------------------
DATASET_PATH = "DataSet"
TRAIN_PATH   = os.path.join(DATASET_PATH, "train")
VAL_PATH     = os.path.join(DATASET_PATH, "val")
TEST_PATH    = os.path.join(DATASET_PATH, "test")

IMG_HEIGHT, IMG_WIDTH, IMG_CHANNELS = 256, 256, 3
IMAGE_TYPE = 'RGB'   # or 'SAR'

print(f"Dataset Path: {DATASET_PATH}")
print(f"Image Type: {IMAGE_TYPE}")
print(f"Image Size: {IMG_HEIGHT}x{IMG_WIDTH}x{IMG_CHANNELS}")

Dataset Path: DataSet
Image Type: RGB
Image Size: 256x256x3


In [16]:
# -------------------------------
# 2. SAR & RGB Preprocessing
# -------------------------------
def lee_filter(img, window_size=5):
    """Lee speckle filter for SAR images"""
    img = img.astype(np.float32)
    img_mean = uniform_filter(img, window_size)
    img_sqr_mean = uniform_filter(img**2, window_size)
    img_variance = img_sqr_mean - img_mean**2
    overall_variance = np.var(img)
    weights = img_variance / (img_variance + overall_variance + 1e-10)
    return img_mean + weights * (img - img_mean)

def preprocess_sar(image):
    """Preprocess SAR image"""
    image = lee_filter(image, window_size=5)
    image = np.log(image + 1e-10)
    image = (image - image.mean()) / (image.std() + 1e-10)
    image = (image - image.min()) / (image.max() - image.min() + 1e-10)
    return image.astype(np.float32)

def preprocess_rgb(image):
    """Normalize RGB image to [0,1]"""
    return image.astype(np.float32) / 255.0

In [23]:
# -------------------------------
# 3. Data Loader (TF Dataset)
# -------------------------------
IMG_SIZE = 256

def load_image(img_path, mask_path, image_type="RGB"):
    """Load and preprocess image + mask"""
    # --- Image ---
    img = cv2.imread(img_path, cv2.IMREAD_UNCHANGED)
    if image_type == "SAR":
        img = cv2.resize(img, (IMG_SIZE, IMG_SIZE))
        img = preprocess_sar(img.astype(np.float32)/255.0)
        img = np.stack([img, img, img], axis=-1)  # 3-channel
    else:
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        img = cv2.resize(img, (IMG_SIZE, IMG_SIZE))
        img = preprocess_rgb(img)

    # --- Mask ---
    mask = cv2.imread(mask_path, cv2.IMREAD_GRAYSCALE)
    mask = cv2.resize(mask, (IMG_SIZE, IMG_SIZE), interpolation=cv2.INTER_NEAREST)
    mask = (mask > 127).astype(np.float32)
    mask = np.expand_dims(mask, axis=-1)

    return img, mask


def build_dataset(img_dir, mask_dir, batch_size=8, shuffle=True, image_type="RGB"):
    exts = ("*.png", "*.jpg", "*.jpeg", "*.tif")
    img_files, mask_files = [], []
    for ext in exts:
        img_files.extend(sorted(glob(os.path.join(img_dir, ext))))
        mask_files.extend(sorted(glob(os.path.join(mask_dir, ext))))

    print(f"[DEBUG] {img_dir}: {len(img_files)} images, {len(mask_files)} masks")

    if len(img_files) == 0 or len(mask_files) == 0:
        raise ValueError(f"No images or masks found in {img_dir} or {mask_dir}")

    def _generator():
        for img_path, mask_path in zip(img_files, mask_files):
            img, mask = load_image(img_path, mask_path, image_type)
            yield img, mask

    dataset = tf.data.Dataset.from_generator(
        _generator,
        output_types=(tf.float32, tf.float32),
        output_shapes=([IMG_SIZE, IMG_SIZE, 3], [IMG_SIZE, IMG_SIZE, 1])
    )

    if shuffle:
        dataset = dataset.shuffle(buffer_size=100)

    return dataset.batch(batch_size).prefetch(tf.data.AUTOTUNE)


In [24]:
# -------------------------------
# 4. U-Net Model
# -------------------------------
def simple_unet(input_size=(256, 256, 3)):
    inputs = layers.Input(input_size)

    def conv_block(x, filters):
        x = layers.Conv2D(filters, (3, 3), padding="same")(x)
        x = layers.BatchNormalization()(x)
        x = layers.ReLU()(x)
        x = layers.Conv2D(filters, (3, 3), padding="same")(x)
        x = layers.BatchNormalization()(x)
        x = layers.ReLU()(x)
        return x

    c1 = conv_block(inputs, 32)
    p1 = layers.MaxPooling2D((2, 2))(c1)

    c2 = conv_block(p1, 64)
    p2 = layers.MaxPooling2D((2, 2))(c2)

    c3 = conv_block(p2, 128)
    p3 = layers.MaxPooling2D((2, 2))(c3)

    c4 = conv_block(p3, 256)
    p4 = layers.MaxPooling2D((2, 2))(c4)

    bn = conv_block(p4, 512)
    bn = layers.Dropout(0.5)(bn)

    def up_block(x, skip, filters):
        x = layers.Conv2DTranspose(filters, (2, 2), strides=(2, 2), padding="same")(x)
        x = layers.concatenate([x, skip])
        x = conv_block(x, filters)
        return x

    u6 = up_block(bn, c4, 256)
    u7 = up_block(u6, c3, 128)
    u8 = up_block(u7, c2, 64)
    u9 = up_block(u8, c1, 32)

    outputs = layers.Conv2D(1, (1, 1), activation="sigmoid")(u9)
    return models.Model(inputs, outputs)


In [25]:
# -------------------------------
# 5. Loss & Metrics
# -------------------------------
def dice_coefficient(y_true, y_pred, smooth=1e-6):
    y_true_f = tf.keras.backend.flatten(y_true)
    y_pred_f = tf.keras.backend.flatten(y_pred)
    intersection = tf.keras.backend.sum(y_true_f * y_pred_f)
    return (2. * intersection + smooth) / (
        tf.keras.backend.sum(y_true_f) + tf.keras.backend.sum(y_pred_f) + smooth
    )

def iou_score(y_true, y_pred, smooth=1e-6):
    y_true_f = tf.keras.backend.flatten(y_true)
    y_pred_f = tf.keras.backend.flatten(y_pred)
    intersection = tf.keras.backend.sum(y_true_f * y_pred_f)
    union = tf.keras.backend.sum(y_true_f) + tf.keras.backend.sum(y_pred_f) - intersection
    return (intersection + smooth) / (union + smooth)

def bce_dice_loss(y_true, y_pred):
    bce = tf.keras.losses.binary_crossentropy(y_true, y_pred)
    bce = tf.reduce_mean(bce)
    return bce + (1 - dice_coefficient(y_true, y_pred))

In [30]:
# -------------------------------
# 6. Build Datasets
# -------------------------------
train_ds = build_dataset(TRAIN_PATH + "/images", TRAIN_PATH + "/masks", batch_size=2, shuffle=True, image_type=IMAGE_TYPE)
val_ds   = build_dataset(VAL_PATH + "/images", VAL_PATH + "/masks", batch_size=2, shuffle=False, image_type=IMAGE_TYPE)
test_ds  = build_dataset(TEST_PATH + "/images", TEST_PATH + "/masks", batch_size=2, shuffle=False, image_type=IMAGE_TYPE)

[DEBUG] DataSet\train/images: 811 images, 811 masks
[DEBUG] DataSet\val/images: 203 images, 203 masks
[DEBUG] DataSet\test/images: 254 images, 254 masks


In [31]:
# -------------------------------
# 7. Compile & Train Model
# -------------------------------
model = simple_unet(input_size=(IMG_SIZE, IMG_SIZE, 3))
model.compile(optimizer=tf.keras.optimizers.Adam(1e-4),
              loss=bce_dice_loss,
              metrics=["accuracy", dice_coefficient, iou_score])

model.summary()

callbacks = [
    tf.keras.callbacks.EarlyStopping(patience=5, restore_best_weights=True),
    tf.keras.callbacks.ReduceLROnPlateau(factor=0.5, patience=3)
]

history = model.fit(
    train_ds,
    validation_data=val_ds,
    epochs=5,
    callbacks=callbacks
)


# -------------------------------
# 8. Evaluation
# -------------------------------
test_loss, test_acc, test_dice, test_iou = model.evaluate(test_ds)
print(f"Test Loss: {test_loss:.4f}, Test Acc: {test_acc:.4f}, Test Dice: {test_dice:.4f}, Test IoU: {test_iou:.4f}")

Epoch 1/5
      2/Unknown [1m28s[0m 1s/step - accuracy: 0.6332 - dice_coefficient: 0.6220 - iou_score: 0.4529 - loss: 1.0722 

AbortedError: Graph execution error:

Detected at node StatefulPartitionedCall/gradient_tape/functional_5_1/conv2d_104_1/convolution/Conv2DBackpropFilter defined at (most recent call last):
<stack traces unavailable>
Operation received an exception:Status: 1, message: could not create a memory object, in file tensorflow/core/kernels/mkl/mkl_conv_grad_filter_ops.cc:685
	 [[{{node StatefulPartitionedCall/gradient_tape/functional_5_1/conv2d_104_1/convolution/Conv2DBackpropFilter}}]] [Op:__inference_multi_step_on_iterator_88616]

In [None]:
# -------------------------------
# 9. Visualization
# -------------------------------
def visualize_predictions(dataset, num_samples=3):
    for imgs, masks in dataset.take(1):
        preds = model.predict(imgs)
        preds = (preds > 0.5).astype(np.uint8)

        plt.figure(figsize=(12, num_samples*4))
        for i in range(num_samples):
            plt.subplot(num_samples, 3, 3*i+1)
            plt.imshow(imgs[i])
            plt.title("Input Image")
            plt.axis("off")

            plt.subplot(num_samples, 3, 3*i+2)
            plt.imshow(masks[i].numpy().squeeze(), cmap="gray")
            plt.title("Ground Truth")
            plt.axis("off")

            plt.subplot(num_samples, 3, 3*i+3)
            plt.imshow(preds[i].squeeze(), cmap="gray")
            plt.title("Prediction")
            plt.axis("off")

        plt.show()

visualize_predictions(test_ds, num_samples=3)