In [30]:
import segmentation_models as sm
import tensorflow as tf
from tensorflow.keras.optimizers import Adam
from sklearn.model_selection import train_test_split
import numpy as np
import cv2
import os
import matplotlib.pyplot as plt
from tensorflow.keras.utils import to_categorical
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import to_categorical
from sklearn.model_selection import train_test_split

In [32]:
# Define the color map
color_map = {
    0: [0, 0, 0],                # void
    1: [108, 64, 20],            # dirt
    2: [255, 229, 204],          # sand
    3: [0, 102, 0],              # grass
    4: [0, 255, 0],              # tree
    5: [0, 153, 153],            # pole
    6: [0, 128, 255],            # water
    7: [0, 0, 255],              # sky
    8: [255, 255, 0],            # vehicle
    9: [255, 0, 127],            # container/generic-object
    10: [64, 64, 64],            # asphalt
    11: [255, 128, 0],           # gravel
    12: [255, 0, 0],             # building
    13: [153, 76, 0],            # mulch
    14: [102, 102, 0],           # rock-bed
    15: [102, 0, 0],             # log
    16: [0, 255, 128],           # bicycle
    17: [204, 153, 255],         # person
    18: [102, 0, 204],           # fence
    19: [255, 153, 204],         # bush
    20: [0, 102, 102],           # sign
    21: [153, 204, 255],         # rock
    22: [102, 255, 255],         # bridge
    23: [101, 101, 11],          # concrete
    24: [114, 85, 47]            # picnic-table
}

# Create a reverse lookup to map colors to class labels
color_to_class = {tuple(value): key for key, value in color_map.items()}

def color_to_class_label(mask):
    # Convert mask to a 2D array where each pixel value is represented by an RGB tuple
    mask_rgb = mask.reshape(-1, 3)  # Reshape to a 2D array where each row is an RGB pixel

    # Map RGB values to class labels
    label_mask = np.array([color_to_class.get(tuple(rgb), 0) for rgb in mask_rgb])

    # Reshape back to the original mask shape (height, width)
    label_mask = label_mask.reshape(mask.shape[0], mask.shape[1])

    return label_mask

# def color_to_class_label(mask):
#     # Create an empty label mask
#     label_mask = np.zeros((mask.shape[0], mask.shape[1]), dtype=np.int32)  # Use np.int32 instead of np.long
    
#     # For each pixel, get the RGB value and map it to the class label
#     for i in range(mask.shape[0]):
#         for j in range(mask.shape[1]):
#             rgb_value = tuple(mask[i, j])
#             if rgb_value in color_to_class:
#                 label_mask[i, j] = color_to_class[rgb_value]
#             else:
#                 label_mask[i, j] = 0  # Default to class 0 if color is not found

#     return label_mask

In [36]:
# Paths
IMG_DIR = "./content/RUGD_frames"
MASK_DIR = "./content/RUGD_annotations"

IMG_SIZE = (672, 544)
NUM_CLASSES = 25  # Cityscapes has 19 semantic classes
BATCH_SIZE = 8  # Reduce batch size if kernel still crashes


# 🔹 **Get List of Files & Split Data**
def get_data_splits(test_size=0.1, val_size=0.1):
    img_files = []
    
    for root, _, files in os.walk(IMG_DIR):
        for file in files:
            if file.endswith('.png'):
                img_files.append(os.path.join(root, file))

    img_files.sort()  # Ensure consistency
    np.random.shuffle(img_files)  # Shuffle dataset

    # Split into train, validation, and test
    train_files, test_files = train_test_split(img_files, test_size=test_size, random_state=42)
    train_files, val_files = train_test_split(train_files, test_size=val_size / (1 - test_size), random_state=42)

    return train_files, val_files, test_files

# 🔹 **Function to Load Single Image**
def load_image(image_path):
    img = cv2.imread(image_path)
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)  # Convert BGR to RGB
    img = cv2.resize(img, IMG_SIZE)
    img = img / 255.0  # Normalize
    return img

# 🔹 **Function to Load Single Mask**
def load_mask(mask_path):
    mask = cv2.imread(mask_path)  # Read as grayscale
    mask_resized = cv2.resize(mask, IMG_SIZE, interpolation=cv2.INTER_NEAREST)

    label_mask = color_to_class_label(mask_resized)
    
    return label_mask.astype(np.int32)  # Keep as integers (not one-hot)

# 🔹 **Function to Load Data Efficiently Using a Generator**
def rugd_generator(split="train"):     
    for root, dirs, files in os.walk(IMG_DIR):
        for file in files:
            if file.endswith('.png'):
                img_path = os.path.join(root, file)
                mask_path = os.path.join(MASK_DIR, os.path.relpath(img_path, IMG_DIR))

                img = load_image(img_path)
                mask = load_mask(mask_path)

                yield img, mask


# 🔹 **Convert Generator to `tf.data.Dataset`**
def get_rugd_dataset(split="train", batch_size=BATCH_SIZE):
    dataset = tf.data.Dataset.from_generator(
        lambda: rugd_generator(split),
        output_signature=(
            tf.TensorSpec(shape=(544, 672, 3), dtype=tf.float32),
            tf.TensorSpec(shape=(544, 672), dtype=tf.int32)
        )
    )

    dataset = dataset.batch(batch_size).prefetch(tf.data.AUTOTUNE)  # Optimize performance
    return dataset



# 🔹 **Check the Dataset**
for img_batch, mask_batch in train_dataset.take(1):  # Check first batch
    print(f"Batch Image Shape: {img_batch.shape}, Batch Mask Shape: {mask_batch.shape}")

# 🔹 **Create Train, Validation & Test Datasets**
train_files, val_files, test_files = get_data_splits()

train_dataset = get_rugd_dataset(train_files)
val_dataset = get_rugd_dataset(val_files)
test_dataset = get_rugd_dataset(test_files)

print(f"Total images: {len(train_files) + len(val_files) + len(test_files)}")
print(f"Train set: {len(train_files)} images")
print(f"Validation set: {len(val_files)} images")
print(f"Test set: {len(test_files)} images")

Batch Image Shape: (8, 544, 672, 3), Batch Mask Shape: (8, 544, 672)
Total images: 7436
Train set: 5948 images
Validation set: 744 images
Test set: 744 images


2025-03-07 15:00:48.267373: I tensorflow/core/framework/local_rendezvous.cc:405] Local rendezvous is aborting with status: OUT_OF_RANGE: End of sequence


In [23]:
# Ensure compatibility with TensorFlow 2.x
sm.set_framework("tf.keras")

# Set backbone (EfficientNet or ResNet)
BACKBONE = "efficientnetb3"  # Try 'resnet50' for ResNet backbone

# Load Pretrained U-Net (or PSPNet, FPN)
model = sm.Unet(BACKBONE, 
                encoder_weights="imagenet", 
                classes=25, 
                activation="softmax",
               input_shape=(544, 672, 3)
               )

# Freeze ALL layers initially
for layer in model.layers:
    layer.trainable = False

# Unfreeze only the decoder
for layer in model.layers:
    if "decoder" in layer.name:
        layer.trainable = True  # Fine-tune decoder

# Compile Model
model.compile(optimizer=Adam(learning_rate=0.0001), 
              loss="sparse_categorical_crossentropy", 
              metrics=["accuracy"])

model.summary()



In [24]:
tf.config.run_functions_eagerly(True)  # Forces eager execution


In [27]:
history = model.fit(
    train_dataset,
    validation_data=val_dataset,
    epochs=30
)


Epoch 1/30
      2/Unknown [1m96s[0m 43s/step - accuracy: 0.0201 - loss: 3.3958


KeyboardInterrupt



In [None]:
# Evaluate on Validation Set
val_loss, val_acc = model.evaluate(val_dataset)
print(f"Validation Accuracy: {val_acc:.4f}")

# Test on a Sample Image
for test_img, test_mask in test_dataset.take(1):  # Take first test batch
    pred_mask = model.predict(test_img)
    pred_mask = np.argmax(pred_mask, axis=-1)  # Convert from one-hot to class indices
    break  # Only process one batch

# Display Results
plt.figure(figsize=(10, 5))
plt.subplot(1, 3, 1); plt.imshow(test_img[0]); plt.title("Input Image")
plt.subplot(1, 3, 2); plt.imshow(test_mask[0]); plt.title("Ground Truth")
plt.subplot(1, 3, 3); plt.imshow(pred_mask[0]); plt.title("Predicted Mask")
plt.show()
