In [2]:
import os
import numpy as np
import tensorflow as tf
from tensorflow.keras.preprocessing.image import load_img, img_to_array

IMG_SIZE = (256, 256)

def load_image_mask(image_path, mask_path):
    # Load RGB image
    img = load_img(image_path, target_size=IMG_SIZE)
    img = img_to_array(img) / 255.0  # normalize [0, 1]
    
    # Load mask (grayscale), normalize, ensure binary
    mask = load_img(mask_path, target_size=IMG_SIZE, color_mode="grayscale")
    mask = img_to_array(mask) / 255.0
    mask = np.where(mask > 0.5, 1.0, 0.0).astype(np.float32)  # hard binarize

    return img.astype(np.float32), mask.astype(np.float32)

def create_dataset(image_dir, mask_dir, batch_size=8, shuffle=True):
    image_paths = sorted([os.path.join(image_dir, f) for f in os.listdir(image_dir) if f.lower().endswith(('.png','.jpg','.jpeg'))])
    mask_paths  = sorted([os.path.join(mask_dir,  f) for f in os.listdir(mask_dir)  if f.lower().endswith(('.png','.jpg','.jpeg'))])
    assert len(image_paths) == len(mask_paths), "Image and mask counts must match"

    def gen():
        for ip, mp in zip(image_paths, mask_paths):
            img, mask = load_image_mask(ip, mp)
            yield img, mask

    ds = tf.data.Dataset.from_generator(
    gen,
    output_signature=(
        tf.TensorSpec(shape=(IMG_SIZE[0], IMG_SIZE[1], 3), dtype=tf.float32),
        tf.TensorSpec(shape=(IMG_SIZE[0], IMG_SIZE[1], 1), dtype=tf.float32)
    )
)
    if shuffle:
        ds = ds.shuffle(buffer_size=len(image_paths))
    ds = ds.batch(batch_size).prefetch(tf.data.AUTOTUNE)
    return ds

# Paths
base_dir    = r"D:\oil_spill_detection\data"
train_images = os.path.join(base_dir, "train", "images")
train_masks  = os.path.join(base_dir, "train", "masks")
val_images   = os.path.join(base_dir, "val",   "images")
val_masks    = os.path.join(base_dir, "val",   "masks")
test_images  = os.path.join(base_dir, "test",  "images")
test_masks   = os.path.join(base_dir, "test",  "masks")

# Datasets
train_ds = create_dataset(train_images, train_masks, batch_size=8, shuffle=True)
val_ds   = create_dataset(val_images,   val_masks,  batch_size=8, shuffle=False)
test_ds  = create_dataset(test_images,  test_masks, batch_size=8, shuffle=False)


A module that was compiled using NumPy 1.x cannot be run in
NumPy 2.0.2 as it may crash. To support both 1.x and 2.x
versions of NumPy, modules must be compiled with NumPy 2.0.
Some module may need to rebuild instead e.g. with 'pybind11>=2.12'.

If you are a user of the module, the easiest solution will be to
downgrade to 'numpy<2' or try to upgrade the affected module.
We expect that some modules will need time to support NumPy 2.

Traceback (most recent call last):  File "C:\Users\VANSH AGGARWAL\anaconda3\envs\tf_gpu\lib\runpy.py", line 197, in _run_module_as_main
    return _run_code(code, main_globals, None,
  File "C:\Users\VANSH AGGARWAL\anaconda3\envs\tf_gpu\lib\runpy.py", line 87, in _run_code
    exec(code, run_globals)
  File "C:\Users\VANSH AGGARWAL\anaconda3\envs\tf_gpu\lib\site-packages\ipykernel_launcher.py", line 18, in <module>
    app.launch_new_instance()
  File "C:\Users\VANSH AGGARWAL\anaconda3\envs\tf_gpu\lib\site-packages\traitlets\config\application.py", line 10

AttributeError: _ARRAY_API not found


A module that was compiled using NumPy 1.x cannot be run in
NumPy 2.0.2 as it may crash. To support both 1.x and 2.x
versions of NumPy, modules must be compiled with NumPy 2.0.
Some module may need to rebuild instead e.g. with 'pybind11>=2.12'.

If you are a user of the module, the easiest solution will be to
downgrade to 'numpy<2' or try to upgrade the affected module.
We expect that some modules will need time to support NumPy 2.

Traceback (most recent call last):  File "C:\Users\VANSH AGGARWAL\anaconda3\envs\tf_gpu\lib\runpy.py", line 197, in _run_module_as_main
    return _run_code(code, main_globals, None,
  File "C:\Users\VANSH AGGARWAL\anaconda3\envs\tf_gpu\lib\runpy.py", line 87, in _run_code
    exec(code, run_globals)
  File "C:\Users\VANSH AGGARWAL\anaconda3\envs\tf_gpu\lib\site-packages\ipykernel_launcher.py", line 18, in <module>
    app.launch_new_instance()
  File "C:\Users\VANSH AGGARWAL\anaconda3\envs\tf_gpu\lib\site-packages\traitlets\config\application.py", line 10

AttributeError: _ARRAY_API not found

ImportError: numpy.core._multiarray_umath failed to import

ImportError: numpy.core.umath failed to import

TypeError: Unable to convert function return value to a Python type! The signature was
	() -> handle

In [None]:
import matplotlib.pyplot as plt

# Take one batch from train_ds
for images, masks in train_ds.take(1):
    # Convert tensors to numpy arrays
    img_np  = images[0].numpy()
    mask_np = masks[0].numpy().squeeze()  # convert + squeeze

    plt.figure(figsize=(8,4))

    plt.subplot(1,2,1)
    plt.imshow(img_np)
    plt.title("Original Image")
    plt.axis("off")

    plt.subplot(1,2,2)
    plt.imshow(mask_np, cmap="gray")
    plt.title("Paired Mask")
    plt.axis("off")

    plt.tight_layout()
    plt.show()

In [None]:
import tensorflow as tf
from tensorflow.keras import backend as K

def dice_coefficient(y_true, y_pred, smooth=1.0):
    y_true_f = K.flatten(y_true)
    y_pred_f = K.flatten(y_pred)
    intersection = K.sum(y_true_f * y_pred_f)
    return (2.0 * intersection + smooth) / (K.sum(y_true_f) + K.sum(y_pred_f) + smooth)

def dice_loss(y_true, y_pred):
    return 1.0 - dice_coefficient(y_true, y_pred)

def bce_dice_loss(y_true, y_pred):
    bce = tf.keras.losses.BinaryCrossentropy(from_logits=False)(y_true, y_pred)
    return bce + dice_loss(y_true, y_pred)

def iou_metric(y_true, y_pred, smooth=1.0):
    y_true_f = K.flatten(y_true)
    y_pred_f = K.flatten(y_pred)
    intersection = K.sum(y_true_f * y_pred_f)
    union = K.sum(y_true_f) + K.sum(y_pred_f) - intersection
    return (intersection + smooth) / (union + smooth)

In [None]:
from tensorflow.keras import layers, models

def conv_block(x, filters):
    x = layers.Conv2D(filters, 3, padding='same')(x)
    x = layers.BatchNormalization()(x)
    x = layers.ReLU()(x)
    x = layers.Conv2D(filters, 3, padding='same')(x)
    x = layers.BatchNormalization()(x)
    x = layers.ReLU()(x)
    return x

def encoder_block(x, filters):
    c = conv_block(x, filters)
    p = layers.MaxPooling2D((2, 2))(c)
    return c, p

def decoder_block(x, skip, filters):
    x = layers.Conv2DTranspose(filters, 2, strides=2, padding='same')(x)
    x = layers.Concatenate()([x, skip])
    x = conv_block(x, filters)
    return x

def build_unet(input_shape=(256, 256, 3)):
    inputs = layers.Input(shape=input_shape)

    # Encoder
    s1, p1 = encoder_block(inputs, 64)
    s2, p2 = encoder_block(p1, 128)
    s3, p3 = encoder_block(p2, 256)
    s4, p4 = encoder_block(p3, 512)

    # Bottleneck
    b = conv_block(p4, 1024)

    # Decoder
    d1 = decoder_block(b, s4, 512)
    d2 = decoder_block(d1, s3, 256)
    d3 = decoder_block(d2, s2, 128)
    d4 = decoder_block(d3, s1, 64)

    # Output (binary mask)
    outputs = layers.Conv2D(1, 1, activation='sigmoid')(d4)

    model = models.Model(inputs, outputs, name="UNet_OilSpill")
    return model

model = build_unet()
model.summary()

In [None]:
model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=1e-3),
    loss=bce_dice_loss,
    metrics=['accuracy', dice_coefficient, iou_metric]
)

early_stop = tf.keras.callbacks.EarlyStopping(
    monitor='val_loss', patience=5, restore_best_weights=True
)
reduce_lr = tf.keras.callbacks.ReduceLROnPlateau(
    monitor='val_loss', factor=0.5, patience=3, min_lr=1e-6
)
checkpoint = tf.keras.callbacks.ModelCheckpoint(
    "oilspill_unet_best.h5", monitor='val_loss', save_best_only=True
)

In [None]:
import matplotlib.pyplot as plt

EPOCHS = 10

history = model.fit(
    train_ds,
    validation_data=val_ds,
    epochs=EPOCHS,
    callbacks=[early_stop, reduce_lr, checkpoint]
)

# Accuracy & Loss curves
fig, (ax1, ax2) = plt.subplots(1,2, figsize=(12,5))
ax1.plot(history.history['accuracy'], label='Train Acc')
ax1.plot(history.history['val_accuracy'], label='Val Acc')
ax1.set_title('Accuracy'); ax1.set_xlabel('Epoch'); ax1.legend(); ax1.grid(True)

ax2.plot(history.history['loss'], label='Train Loss')
ax2.plot(history.history['val_loss'], label='Val Loss')
ax2.set_title('Loss (BCE + Dice)'); ax2.set_xlabel('Epoch'); ax2.legend(); ax2.grid(True)

plt.tight_layout(); plt.show()

In [None]:
from sklearn.metrics import precision_score, recall_score, f1_score, jaccard_score
import numpy as np

def evaluate_on_test(model, test_ds, threshold=0.5):
    y_true_all = []
    y_pred_all = []
    for imgs, masks in test_ds:
        preds = model.predict(imgs)
        preds_bin = (preds > threshold).astype(np.uint8)
        # Flatten for sklearn metrics: per-pixel classification
        y_true_all.append(masks.numpy().ravel())
        y_pred_all.append(preds_bin.ravel())

    y_true = np.concatenate(y_true_all)
    y_pred = np.concatenate(y_pred_all)

    precision = precision_score(y_true, y_pred, average='binary', zero_division=0)
    recall    = recall_score(y_true, y_pred, average='binary',  zero_division=0)
    f1        = f1_score(y_true, y_pred, average='binary',      zero_division=0)
    iou       = jaccard_score(y_true, y_pred, average='binary')

    return {
        "Pixel Precision": float(precision),
        "Pixel Recall": float(recall),
        "Pixel F1 (Dice)": float(f1),
        "Pixel IoU": float(iou)
    }

test_report = evaluate_on_test(model, test_ds, threshold=0.5)
print(test_report)

In [None]:
import matplotlib.pyplot as plt

def visualize_segmentation_results(model, dataset, num_samples=3, threshold=0.5):
    images, masks = next(iter(dataset))
    preds = model.predict(images)
    preds_bin = (preds > threshold).astype(np.float32)

    for i in range(min(num_samples, images.shape[0])):
        plt.figure(figsize=(12,4))

        # Original image
        plt.subplot(1,3,1)
        plt.imshow(images[i])
        plt.title("Original")
        plt.axis("off")

        # Ground truth mask
        plt.subplot(1,3,2)
        plt.imshow(masks[i].squeeze(), cmap="gray")
        plt.title("Ground Truth")
        plt.axis("off")

        # Predicted mask
        plt.subplot(1,3,3)
        plt.imshow(preds_bin[i].squeeze(), cmap="gray")
        plt.title("Prediction")
        plt.axis("off")

        plt.tight_layout()
        plt.show()

# Example usage:
visualize_segmentation_results(model, test_ds, num_samples=3, threshold=0.5)

In [None]:
import tensorflow as tf
print(tf.config.list_physical_devices('GPU'))