In [None]:
import numpy as np
import cv2
from matplotlib import pyplot as plt
import os
from keras.models import Model
from keras.layers import Input, Conv2D, MaxPooling2D, UpSampling2D, concatenate, Conv2DTranspose, BatchNormalization, Dropout, Lambda
from keras.optimizers import Adam
from keras.layers import Activation, MaxPool2D, Concatenate
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import img_to_array
from keras.optimizers import Adam
from tensorflow.keras.callbacks import TensorBoard, EarlyStopping
import time
import tensorflow.keras.backend as K
from tensorflow.keras.metrics import MeanIoU

%matplotlib inline

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!unzip "/content/test.zip"
!unzip "/content/train.zip"

In [None]:
def extract_focal_and_baseline(calib_file):
    with open(calib_file, 'r') as file:
        lines = file.readlines()
        P0 = np.array([float(x) for x in lines[0].strip().split()[1:]])
        P1 = np.array([float(x) for x in lines[1].strip().split()[1:]])
        f = P0[0]
        b = (P1[3] - P0[3]) / f
    return f, abs(b)

def load_data(img_left_folder, img_right_folder, calib_folder, mask_folder):
    images_left_files = [f for f in os.listdir(img_left_folder) if f.endswith(('.png', '.jpg'))]
    images_left_files.sort()

    stereo_images = []
    for filename in images_left_files:
        left_image_path = os.path.join(img_left_folder, filename)
        right_image_path = os.path.join(img_right_folder, filename)

        calib_filename = filename.replace('.jpg', '.txt').replace('.png', '.txt')
        calib_file_path = os.path.join(calib_folder, calib_filename)

        parts = filename.split('_')
        mask_filename = parts[0] + '_road_' + '_'.join(parts[1:])
        mask_filename = mask_filename.replace('.jpg', '.png')
        mask_path = os.path.join(mask_folder, mask_filename)

        if os.path.exists(right_image_path) and os.path.exists(calib_file_path) and os.path.exists(mask_path):
            image_left = cv2.imread(left_image_path, cv2.IMREAD_COLOR)
            image_right = cv2.imread(right_image_path, cv2.IMREAD_COLOR)
            mask = cv2.imread(mask_path, cv2.IMREAD_GRAYSCALE)  # Load the mask

            # Resize images and mask
            image_left = cv2.resize(image_left, (640, 192), interpolation=cv2.INTER_LINEAR)
            image_right = cv2.resize(image_right, (640, 192), interpolation=cv2.INTER_LINEAR)
            mask = cv2.resize(mask, (640, 192), interpolation=cv2.INTER_LINEAR)

            if image_left is not None and image_right is not None and mask is not None:
                f, b = extract_focal_and_baseline(calib_file_path)
                stereo_images.append((image_left, image_right, mask, f, b))
            else:
                print(f"Error loading files for {filename}")
        else:
            if not os.path.exists(right_image_path):
                print(f"Warning: No corresponding right image found for {filename} in {img_right_folder}")
            if not os.path.exists(calib_file_path):
                print(f"Warning: No corresponding calibration file found for {calib_filename} in {calib_folder}")
            if not os.path.exists(mask_path):
                print(f"Warning: No corresponding mask found for {mask_filename} in {mask_folder}")
    return stereo_images

img_left_folder = "/content/train/image_left"
img_right_folder = "/content/train/image_right"
calib_folder = "/content/train/calib"
mask_folder = "/content/train/gt_image_left"

stereo_images = load_data(img_left_folder, img_right_folder, calib_folder, mask_folder)
print(f"Loaded {len(stereo_images)} stereo image pairs with calibration data and masks.")

Loaded 135 stereo image pairs with calibration data and masks.


## 1. Compute disparity between the two stereo images.

In [None]:
# Experiment the best combination of numDisparities and blockSize

img_left, img_right = stereo_images[0][0], stereo_images[0][1]  # Test on the first pair to start

# Convert images to grayscale
img_left_gray = cv2.cvtColor(img_left, cv2.COLOR_BGR2GRAY)
img_right_gray = cv2.cvtColor(img_right, cv2.COLOR_BGR2GRAY)

numDisparities_values = [64, 80, 96, 112]  # numDisparities must be divisible by 16
blockSizes = [21, 23, 25, 27, 29]  # block sizes to test

# Iterate over each combination of numDisparities and blockSize
for nd in numDisparities_values:
    for bs in blockSizes:
        stereo = cv2.StereoBM_create(numDisparities=nd, blockSize=bs)
        disparity = stereo.compute(img_left_gray, img_right_gray)

        # Plot the result
        plt.figure(figsize=(10, 3))
        plt.imshow(disparity, 'plasma')
        plt.title(f'numDisparities={nd}, blockSize={bs}')
        plt.colorbar()
        plt.show()

In [None]:
# Plot the disparity map for the first 5 stereo images
for i, (img_left, img_right, mask, f, b) in enumerate(stereo_images[:5]):
    if img_left is not None and img_right is not None:
        plt.figure(figsize=(21, 7))
        plt.subplot(131)
        plt.imshow(cv2.cvtColor(img_left, cv2.COLOR_BGR2RGB))
        plt.title('Left Image')

        plt.subplot(132)
        plt.imshow(cv2.cvtColor(img_right, cv2.COLOR_BGR2RGB))
        plt.title('Right Image')

        img_left_gray = cv2.cvtColor(img_left, cv2.COLOR_BGR2GRAY)
        img_right_gray = cv2.cvtColor(img_right, cv2.COLOR_BGR2GRAY)
        stereo = cv2.StereoBM_create(numDisparities=96, blockSize=21)
        disparity = stereo.compute(img_left_gray, img_right_gray).astype(np.float32) / 16.0
        plt.subplot(133)
        plt.imshow(disparity, 'gray')
        plt.title('Disparity Map')
        plt.show()
    else:
        print(f"Skipping image pair {i} due to loading error.")




## 2. Compute depth of each pixel. Compute 3D location of each pixel.


In [None]:
def compute_depth_map(img_left, img_right, f, b):
    img_left_gray = cv2.cvtColor(img_left, cv2.COLOR_BGR2GRAY)
    img_right_gray = cv2.cvtColor(img_right, cv2.COLOR_BGR2GRAY)

    stereo = cv2.StereoBM_create(numDisparities=96, blockSize=21)
    disparity = stereo.compute(img_left_gray, img_right_gray).astype(np.float32) / 16.0

    disparity[disparity == 0] = 0.1  # To avoid division by zero
    depth = f * b / disparity

    # Normalize depth to span 0 to 255
    # Convert depth values from meters to a scale of 0-255 for display purposes
    max_depth = 50
    min_depth = 0
    depth_normalized = 255 * (depth - min_depth) / (max_depth - min_depth)

    # Clip values to ensure they remain within [0, 255]
    depth_normalized = np.clip(depth_normalized, 0, 255)

    # Convert to an 8-bit unsigned integer
    depth_normalized = depth_normalized.astype(np.uint8)

    return depth_normalized

In [None]:
# Compute and display depth maps for the first few stereo image pairs
for img_left, img_right, mask, f, b in stereo_images[:5]:
    if img_left is not None and img_right is not None:
        plt.figure(figsize=(21, 7))
        plt.subplot(131)
        plt.imshow(cv2.cvtColor(img_left, cv2.COLOR_BGR2RGB))
        plt.title('Left Image')

        plt.subplot(132)
        plt.imshow(cv2.cvtColor(img_right, cv2.COLOR_BGR2RGB))
        plt.title('Right Image')

        plt.subplot(133)
        depth = compute_depth_map(img_left, img_right, f, b)
        depth_display = plt.imshow(depth, cmap='viridis')
        plt.title('Depth Map')
        plt.colorbar(depth_display, orientation='vertical', shrink=0.6)
        plt.show()
    else:
        print(f"Skipping image pair {i} due to loading error.")

## 3. Train a road classifier on a set of annotated images, and compute road pixels in your image.


In [None]:
%load_ext tensorboard

In [None]:
def compute_and_fuse_depth(stereo_images):
    training_data = []
    for idx, (img_left, img_right, mask, f, b) in enumerate(stereo_images):
        # Compute the depth map
        depth_map = compute_depth_map(img_left, img_right, f, b)

        # Normalize the mask and convert to float
        mask_normalized = mask.astype(float) / 255.0  # Assuming mask in 0-255 range

        # Normalize depth to range [0,1] if depth_map values are 0-255
        # If depth values are not in 0-255 range, adjust normalization accordingly
        depth_normalized = (depth_map / 255.0)
        inverted_depth = 1.0 - depth_normalized

        # Create a fused depth map where we only apply the mask where depth_map values are non-zero
        # and mask is white (mask_normalized == 1)
        fused_depth = np.where(mask_normalized == 1, inverted_depth, 0)

        training_data.append((img_left, depth_normalized, mask_normalized, fused_depth))

    return training_data
training_data = compute_and_fuse_depth(stereo_images)
training_data.pop(0) # the mask of the first image is not well defined

In [None]:
# Sanity check for training data
def plot_training_data(training_data):
    num_samples = 5
    fig, axs = plt.subplots(num_samples, 4, figsize=(20, num_samples * 3))

    for idx, (original, depth_map, mask, fused_depth) in enumerate(training_data):
        if idx < num_samples:
            # Original Image
            ax_original = axs[idx, 0]
            im_original = ax_original.imshow(cv2.cvtColor(original, cv2.COLOR_BGR2RGB))
            ax_original.set_title('Original Image')
            ax_original.axis('off')  # Hide axes ticks
            fig.colorbar(im_original, ax=ax_original, orientation='vertical')

            # Depth Map
            ax_depth = axs[idx, 1]
            im_depth = ax_depth.imshow(depth_map, cmap='gray')
            ax_depth.set_title('Depth Map')
            ax_depth.axis('off')
            fig.colorbar(im_depth, ax=ax_depth, orientation='vertical')

            # Ground Truth Mask
            ax_mask = axs[idx, 2]
            im_mask = ax_mask.imshow(mask, cmap='gray')
            ax_mask.set_title('Ground Truth Mask')
            ax_mask.axis('off')
            fig.colorbar(im_mask, ax=ax_mask, orientation='vertical')

            # Fused Depth Map
            ax_fused = axs[idx, 3]
            im_fused = ax_fused.imshow(fused_depth, cmap='gray')
            ax_fused.set_title('Fused Depth Map')
            ax_fused.axis('off')
            fig.colorbar(im_fused, ax=ax_fused, orientation='vertical')


    plt.tight_layout()
    plt.show()
plot_training_data(training_data)


In [None]:
X = []  # Images
y = []  # Fused masks
for (img_left, depth_map, mask, fused_depth) in training_data:
    X.append(img_to_array(img_left))
    y.append(img_to_array(fused_depth))


# Convert lists to numpy arrays and normalize image data
X = np.array(X, dtype=np.float32) / 255.0
y = np.array(y, dtype=np.float32)


# Splitting the data into 80% for training + validation and 20% for testing
X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.20, random_state=42)

train_data = (X_train, y_train)
validation_data = (X_val, y_val)

IMG_HEIGHT = X.shape[1]
IMG_WIDTH  = X.shape[2]
IMG_CHANNELS = X.shape[3]
input_shape = (IMG_HEIGHT, IMG_WIDTH, IMG_CHANNELS)

print(f"Training set: {X_train.shape}, {y_train.shape}")
print(f"Validation set: {X_val.shape}, {y_val.shape}")
print(input_shape)


In [None]:
# Sanity check for training data
def plot_training_data(X, y, num_samples=5):
    plt.figure(figsize=(10, num_samples * 2))

    for i in range(num_samples):
        if i >= len(X):
            break
        ax = plt.subplot(num_samples, 2, 2*i+1)
        plt.imshow(X[i])
        ax.set_title('Original Image')
        ax.axis('off')

        ax = plt.subplot(num_samples, 2, 2*i+2)
        plt.imshow(y[i].squeeze(), cmap='gray')
        ax.set_title('Ground Truth')
        ax.axis('off')

    plt.tight_layout()
    plt.show()

plot_training_data(train_data[0], train_data[1], num_samples=5)


In [None]:
# Building Unet by dividing encoder and decoder into blocks
def conv_block(input, num_filters):
    conv_initializer = "he_normal"

    x = Conv2D(num_filters, 3, padding="same", kernel_initializer=conv_initializer)(input)
    x = BatchNormalization()(x)
    x = Activation("relu")(x)

    x = Conv2D(num_filters, 3, padding="same", kernel_initializer=conv_initializer)(x)
    x = BatchNormalization()(x)
    x = Activation("relu")(x)

    return x


# Encoder block: Conv block followed by max-pooling
def encoder_block(input, num_filters):
    # each conv_block performs 2 conv operations then max-pooling with size 2
    s = conv_block(input, num_filters)  # s will be used as skip_features for concatenation to the decoder
    p = MaxPool2D((2, 2))(s)  # p will be used as the input of the next encoder block
    return s, p


# Decoder block
# skip features gets input from encoder for concatenation

# Concatenating feature maps allows the network to use fine-grained details captured in the encoder alongside the more
# abstract features generated in the decoder. This combination is vital for accurate localization and detailed
# segmentation because it brings together context (from deeper layers) and localization (from earlier layers).
def decoder_block(input, skip_features, num_filters):
    d = Conv2DTranspose(num_filters, (2, 2), strides=2, padding="same")(input)  # up-sampling
    d = Concatenate()(
        [d, skip_features])  # merges the up-sampled feature maps with the corresponding feature maps from the encoder
    d = conv_block(d, num_filters)  # process the concatenated feature maps to refine features further
    return d


# Build Unet using the blocks
def build_unet(input_shape):
    inputs = Input(input_shape)

    # s1 is the output of the first conv_block ( skip_features that will later be concatenated with the corresponding decoder layer's output)
    # p1 is the output after the max-pooling layer following s1 (input to the next encoder block)
    s1, p1 = encoder_block(inputs, 64)
    s2, p2 = encoder_block(p1, 128)
    s3, p3 = encoder_block(p2, 256)
    s4, p4 = encoder_block(p3, 512)

    b = conv_block(p4, 1024)  # Bridge

    d1 = decoder_block(b, s4, 512)
    d2 = decoder_block(d1, s3, 256)
    d3 = decoder_block(d2, s2, 128)
    d4 = decoder_block(d3, s1, 64)

    outputs = Conv2D(1, 1, padding="same", activation="sigmoid")(d4)  # Binary (can be multiclass)

    model = Model(inputs, outputs, name="U-Net")
    return model



In [None]:
# Perform data augmentation by flipping the image horizontally

from tensorflow.keras.preprocessing.image import ImageDataGenerator
# Create an instance of ImageDataGenerator with horizontal flipping enabled
data_gen_args = dict(horizontal_flip=True,
                     fill_mode='nearest')  # fill_mode is used to fill in new pixels that can appear after a transformation

# Instantiate the ImageDataGenerator
image_datagen = ImageDataGenerator(**data_gen_args)
mask_datagen = ImageDataGenerator(**data_gen_args)


# Provide the same seed to the flow methods to ensure the transformations are the same for images and masks
seed = 1
batch_size = 16

# Prepare the generator for the images and masks
image_generator = image_datagen.flow(X_train, batch_size=batch_size, shuffle=True, seed=seed)
mask_generator = mask_datagen.flow(y_train, batch_size=batch_size, shuffle=True, seed=seed)

val_image_generator = image_datagen.flow(X_val, batch_size=batch_size, shuffle=True, seed=seed)
val_mask_generator = mask_datagen.flow(y_val, batch_size=batch_size, shuffle=True, seed=seed)

# Combine generators into one which yields image and masks
train_generator = zip(image_generator, mask_generator)
val_generator = zip(val_image_generator, val_mask_generator)


In [None]:
# Loss function
smooth = 1.

def dice_coef(y_true, y_pred):
    y_true_f = K.flatten(y_true)
    y_pred_f = K.flatten(y_pred)
    intersection = K.sum(y_true_f * y_pred_f)
    return (2. * intersection + smooth) / (K.sum(y_true_f) + K.sum(y_pred_f) + smooth)

def dice_coef_loss(y_true, y_pred):
    return -dice_coef(y_true, y_pred)

def intersection_over_union(y_true, y_pred):
    y_true_f = K.flatten(y_true)
    y_pred_f = K.flatten(y_pred)
    intersection = K.sum(y_true_f * y_pred_f)
    union = K.sum(y_true_f) + K.sum(y_pred_f) - intersection
    return intersection / union

In [None]:
def unet_training(train_generator, validation_data, input_shape, batch_size, epochs):

    # Set up the model
    timestamp = time.strftime("%Y-%m-%d-%H-%M-%S")
    tensorboard_callback = TensorBoard(log_dir=f'logs/{timestamp}', histogram_freq=1)
    early_stopping_callback = EarlyStopping(monitor='val_dice_coef', patience=10, verbose=1, restore_best_weights=True)

    optimizer = Adam(learning_rate=0.001)
    model = build_unet(input_shape)  # Update input shape to match image size
    model.compile(optimizer=optimizer, loss=dice_coef_loss, metrics=['accuracy', dice_coef, intersection_over_union])
    # model.summary()


    # Train the model using the data generator
    history = model.fit(
        train_data[0],
        train_data[1],
        validation_data=validation_data,
        batch_size=batch_size,
        epochs=epochs,
        shuffle=True,
        verbose=1,
        steps_per_epoch=len(X_train) // batch_size,
        callbacks=[tensorboard_callback, early_stopping_callback]
    )

    # Plot training & validation loss values
    plt.plot(history.history['loss'], label='Training Loss')
    plt.plot(history.history['val_loss'], label='Validation Loss')
    plt.title('Model loss')
    plt.ylabel('Loss')
    plt.xlabel('Epoch')
    plt.legend(['Train', 'Val'], loc='upper left')
    plt.show()

    # Plot training & validation accuracy values
    plt.plot(history.history['accuracy'], label='Training Accuracy')
    plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
    plt.title('Model Accuracy')
    plt.ylabel('Accuracy')
    plt.xlabel('Epoch')
    plt.legend(['Train', 'Val'], loc='upper left')
    plt.show()
    return model


model = unet_training(  train_generator,
                        validation_data,
                        input_shape,
                        batch_size=8,
                        epochs=15,)

In [None]:
%tensorboard --logdir /content/logs
tensorboard_callback = TensorBoard(log_dir='/content/drive/My Drive/Colab Notebooks/logs')

In [None]:
# Evaluate Model

def load_data(test_image_folder):
    image_files = [f for f in os.listdir(test_image_folder) if f.endswith(('.png', '.jpg'))]
    image_files.sort()

    test_images = []
    for filename in image_files:
        image_path = os.path.join(test_image_folder, filename)
        if os.path.exists(image_path):
            image = cv2.imread(image_path, cv2.IMREAD_COLOR)
            img_resized = cv2.resize(image, (640, 192))
            test_images.append(img_resized)
        else:
            print(f"Error: No file found for {filename}")
    return test_images

def preprocess_images(images):
    processed_images = []
    for img in images:
        processed_images.append(img_to_array(img))
    return np.array(processed_images, dtype=np.float32) / 255.0


# Load images
test_images_left = load_data("/content/test/image_left")
test_images_right = load_data("/content/test/image_right")  # Adjusted to load right images

# Preprocess images
test_images_left_processed = preprocess_images(test_images_left)
# Predict using the preprocessed images
predictions = model.predict(test_images_left_processed)

for idx in range(30):
    fig, axs = plt.subplots(1, 2, figsize=(20, 10))

    # Display the original image
    original_img = cv2.cvtColor(test_images_left[idx], cv2.COLOR_BGR2RGB)
    axs[0].imshow(original_img)
    axs[0].set_title('Original Image')
    axs[0].axis('off')

    # Prepare the overlay
    predicted_mask = predictions[idx].squeeze()
    overlay = np.zeros_like(original_img, dtype='uint8')  # match the data type
    overlay[predicted_mask > 0.5] = [0, 255, 0]  # Green color in RGB

    # Apply the overlay
    overlay_image = cv2.addWeighted(original_img, 1, overlay, 0.5, 0)

    # Display the prediction overlay on the original image
    axs[1].imshow(overlay_image)
    axs[1].set_title('Original Image with Road Prediction Overlay')
    axs[1].axis('off')

    plt.show()

## 4. Fit a plane in 3D to the road pixels by using the depth of the pixels. Make sure your algorithm is robust to outliers.

In [None]:
from sklearn.linear_model import RANSACRegressor
from sklearn.preprocessing import PolynomialFeatures
from sklearn.linear_model import RANSACRegressor

def extract_3d_coordinates(depth_map, mask, focal_length, px, py):
    # px, py are the principal points (usually the center of the image)
    if mask.ndim == 3 and mask.shape[2] == 1:
        mask = mask[:,:,0]  # This reduces it from (192, 640, 1) to (192, 640)

    indices = np.where(mask > 0)
    Z = depth_map[indices]
    X = (indices[1] - px) * Z / focal_length
    Y = (indices[0] - py) * Z / focal_length
    return np.vstack((X, Y, Z)).T

def fit_plane_RANSAC(points_3d):
    # Fit a plane using RANSAC
    ransac = RANSACRegressor()
    ransac.fit(points_3d[:, :2], points_3d[:, 2])
    return ransac

def plot_3d(points_3d, plane_model):
    fig = plt.figure()
    ax = fig.add_subplot(111, projection='3d')
    ax.scatter(points_3d[:, 0], points_3d[:, 1], points_3d[:, 2], c='r', marker='o')

    # Plot the plane
    xlim = ax.get_xlim()
    ylim = ax.get_ylim()
    X, Y = np.meshgrid(np.linspace(xlim[0], xlim[1], 10), np.linspace(ylim[0], ylim[1], 10))
    Z = plane_model.predict(np.hstack((X.reshape(-1, 1), Y.reshape(-1, 1))))
    Z = Z.reshape(X.shape)
    ax.plot_surface(X, Y, Z, alpha=0.5)

    plt.show()




In [None]:
principal_point_x = 609.5593
principal_point_y = 172.8540

focal_length = 721.5377
baseline = 0.5371505882506209

# Compute depth maps
depth_maps = [compute_depth_map(img_left, img_right, focal_length, baseline) for img_left, img_right in zip(test_images_left, test_images_right)]

# Compute road masks
road_masks = model.predict(test_images_left_processed)


def plot_images(images, titles, cmap=None, figsize=(20, 10)):
    plt.figure(figsize=figsize)
    for i, image in enumerate(images):
        plt.subplot(1, len(images), i + 1)
        plt.imshow(image, cmap=cmap)
        plt.title(titles[i])
        plt.axis('off')
    plt.show()

for i in range(10):
  plot_images([depth_maps[i], road_masks[i]], ['Depth Map', 'Road Mask'], cmap='gray')




## 5. Plot each pixel in 3D (we call this a 3D point cloud). On the same plot, show also the estimated ground plane.


In [None]:
# For each image
i = 0
for depth_map, mask in zip(depth_maps, road_masks):
    points_3d = extract_3d_coordinates(depth_map, mask, focal_length, principal_point_x, principal_point_y)
    plane_model = fit_plane_RANSAC(points_3d)
    plot_3d(points_3d, plane_model)
    if i == 10:
        break
    i += 1