In [None]:
import cv2
import matplotlib.pyplot as plt
import random
from PIL import Image
import tensorflow as tf
import numpy as np
import os
os.environ["SM_FRAMEWORK"] = "tf.keras"

from tensorflow import keras
import segmentation_models as sm

import matplotlib.pyplot as plt

In [None]:
ROOT_DIR = r"..\VIL100"
train_val_pth = f'{ROOT_DIR}\\data\\train.txt'
test_pth = f'{ROOT_DIR}\\data\\test.txt'

# Read training and validation images

In [None]:
train_val_img_pths = []

# Open the file in read mode
with open(train_val_pth, 'r') as file:
    # Read the entire contents of the file into a variable
    file_contents = file.read()
    # train_pths.append(file_contents)
train_val_img_pths = file_contents.split("\n")

train_val_img_pths = [pth.strip() for pth in train_val_img_pths]

# Remove the last item ("Empty string")
train_val_img_pths = train_val_img_pths[:-1]

In [None]:
# Create a full path to each training/validation images
train_val_img_pths = [f'{ROOT_DIR}{pth}' for pth in train_val_img_pths]
train_val_mask_pths = [pth.replace("JPEGImages", "Annotations").replace("jpg", "png") for pth in train_val_img_pths]
# mask_paths = [f'{ROOT_DIR}/Annotations/{pth.replace("/JPEGImages/", "").replace("jpg", "png")}' for pth in img_pths]

In [None]:
# Open an image file
image = Image.open(train_val_img_pths[0]) 

# Get the size of the image (width, height)
width, height = image.size

print(f"Image size: {width} x {height}")

In [None]:
# Zip the lists to map the training/validation original images to their annotations
combined = list(zip(train_val_img_pths, train_val_mask_pths))

# Shuffle the combined list
random.shuffle(combined)

# Step 3: Unzip the shuffled list back into two lists
train_img_pths, mask_pths = zip(*combined)  # Unpack the tuples back into separate lists

# Convert the tuples back to lists (if needed)
train_val_img_pths = list(train_val_img_pths)
train_val_mask_pths = list(train_val_mask_pths)

In [None]:
# Split data into a training and validation set
VALIDATION_SPLIT = 0.2  # Percentage of data to use for validation

# Split file paths and labels into training and validation sets
split_index = int(len(train_val_img_pths) * VALIDATION_SPLIT)
val_img_pths = train_val_img_pths[:split_index]
val_mask_pths = train_val_mask_pths[:split_index]
train_img_pths = train_val_img_pths[split_index:]
train_mask_pths = train_val_mask_pths[split_index:]

# Pre-process Data

In [None]:
# Function to resize images and add a batch dimension
def resize_image(image, target_size=(960, 528)):
    resized_image = tf.image.resize(image, target_size)
    resized_image = tf.expand_dims(resized_image, axis=0)  # Add batch dimension
    return resized_image

# Function to resize masks and add a batch dimension
def resize_mask(mask, target_size=(960, 528)):
    resized_mask = tf.image.resize(mask, target_size)
    resized_mask = tf.expand_dims(resized_mask, axis=0)  # Add batch dimension
    return resized_mask

# Function to load, normalize, and resize images
def load_image(image_path):
    image = tf.io.read_file(image_path)
    image = tf.image.decode_jpeg(image, channels=3)  # Decode RGB image
    image = tf.image.convert_image_dtype(image, tf.float32)  # Normalize to [0, 1]
    return image

# Function to load, convert, and resize masks
def load_mask(mask_path):
    mask = tf.io.read_file(mask_path)
    mask = tf.image.decode_png(mask, channels=1)  # Decode grayscale mask
    mask = tf.where(mask > 0, 1.0, 0.0)  # Convert to binary masks if needed
    return mask

# Define a function to load, normalize, and resize images and masks
def load_image_and_mask(image_path, mask_path):
    image = load_image(image_path)
    mask = load_mask(mask_path)

    # Resize images and masks to (960, 528), ensuring batch dimension
    image = resize_image(image)
    mask = resize_mask(mask)
    
    return image, mask
    # return tf.squeeze(image), tf.squeeze(mask)  # Squeeze to remove extra batch dimension

# Create a TensorFlow training dataset
train_dataset = tf.data.Dataset.from_tensor_slices((train_img_pths, train_mask_pths))
train_dataset = train_dataset.map(load_image_and_mask, num_parallel_calls=tf.data.AUTOTUNE)

# Create a TensorFlow validation dataset
val_dataset = tf.data.Dataset.from_tensor_slices((val_img_pths, val_mask_pths))
val_dataset = val_dataset.map(load_image_and_mask, num_parallel_calls=tf.data.AUTOTUNE)

# Train Models

In [None]:
# Choose a backbone from a list of backbones here: https://github.com/qubvel/segmentation_models
BACKBONE = 'resnet101'
preprocess_input = sm.get_preprocessing(BACKBONE)

In [None]:
callback = tf.keras.callbacks.EarlyStopping(monitor='loss', patience=3)

# define model
model = sm.PSPNet(BACKBONE, input_shape=(960, 528, 3), encoder_weights='imagenet', classes=1, activation='sigmoid')
model.compile(
    'Adam',
    loss=sm.losses.dice_loss,
    metrics=[sm.metrics.f1_score],
)

# fit model
history = model.fit(
   train_dataset,
   batch_size=100,
   epochs=50,
   validation_data=val_dataset,
   callbacks=[callback]
)

In [None]:
# Summary of model
model.summary()

In [None]:
# Save model
model.save('model.keras')
print('Model Saved!')

# Evaluate the Model

In [None]:
# https://www.tensorflow.org/guide/keras/training_with_built_in_methods

In [None]:
test_img_pths = []

# Open the file in read mode
with open(test_pth, 'r') as file:
    # Read the entire contents of the file into a variable
    file_contents = file.read()
    # train_pths.append(file_contents)
test_img_pths = file_contents.split("\n")

test_img_pths = test_img_pths[:-1]
test_img_pths = [pth.strip() for pth in test_img_pths] 

In [None]:
# Create a full path for each image in the testing set
test_img_pths = [f'{ROOT_DIR}{pth}' for pth in test_img_pths]
test_mask_pths = [pth.replace("JPEGImages", "Annotations").replace("jpg", "png") for pth in test_img_pths]
# mask_paths = [f'{ROOT_DIR}/Annotations/{pth.replace("/JPEGImages/", "").replace("jpg", "png")}' for pth in img_pths]

In [None]:
# Create a TensorFlow dataset
test_dataset = tf.data.Dataset.from_tensor_slices((test_img_pths, test_mask_pths))
test_dataset = test_dataset.map(load_image_and_mask)

In [None]:
# Evaluate the model
model.evaluate(test_dataset)

## Visualizations from the VIL-100 test set

Predict lane

In [None]:
# Get the model's input structure
print("Model input:", model.input)  # This tells you what kind of input the model expects

In [None]:
# Example of investigating the 100th image in the testing set
count = 0

# Correct way to iterate over a TensorFlow dataset
for data in test_dataset:

    count += 1
    
    if count == 100:  
        # If 'data' is a tuple, extract the relevant part
        if isinstance(data, tuple):
            image_data = data[0]  # Extract the input tensor
        else:
            image_data = data  # If there's no tuple, use the data directly
        
        # Now use the correct input for prediction
        predictions = model.predict(image_data)

        break

In [None]:
## Draw the binary mask image

# Remove the batch dimension to get the image data
image_data = predictions[0]  # Extract the first (and only) image

threshold = 0.5

# Convert values based on the threshold
# If value >= threshold, round to 1.0, otherwise round to 0.0
image_data = np.where(image_data >= threshold, 1.0, 0.0)

# Since it has a single channel, we need to remove the last dimension
image_data = image_data.squeeze()

# Plot the image using a grayscale color map
plt.imshow(image_data, cmap='gray')  # Use 'gray' colormap for grayscale images
plt.axis('off')  # Hide axis ticks and labels
plt.show()  # Display the image

In [None]:
# Convert mask image's data type
mask_image = image_data.astype(np.uint8)
mask_image.shape

In [None]:
# Delete the batch information from the original matrix
org_image = data[0]

org_image = tf.squeeze(org_image, axis=0).numpy()
org_image.shape

In [None]:
# Display the original image

plt.imshow(org_image)
plt.axis('off')  # Hide axis ticks and labels
plt.show()
plt.savefig('img/normal_1.png')

Display the image with the detected lanes

In [None]:
# Ensure the base image and mask are the same size
assert org_image.shape[:2] == mask_image.shape, "The base image and mask must have the same dimensions."

# Create a red overlay to apply where the mask is white
red_overlay = np.zeros_like(org_image)
red_overlay[:, :, 2] = 255  # Set the red channel to maximum (255)

# Apply the mask to the red overlay to keep only the red in the white regions of the mask
red_masked = cv2.bitwise_and(red_overlay, red_overlay, mask=mask_image)

# Combine the base image and the red-masked overlay
result_image = cv2.addWeighted(org_image, 1.0, red_masked, 1.0, 0)

plt.imshow(cv2.cvtColor(result_image, cv2.COLOR_BGR2RGB))
plt.axis('off')  # Hide axis ticks and labels
plt.show()

## Visualization from the external data set

In [None]:
# Get Prediction for external pictures
base_image_path = r"..\curve_lane.jpg"

base_image = load_image(base_image_path)
base_image = resize_image(base_image, (960, 528))

predictions = model.predict(base_image)

In [None]:
base_image.shape

In [None]:
# Remove the batch dimension to get the image data
image_data = predictions[0]  # Extract the first (and only) image

threshold = 0.5

# Convert values based on the threshold
# If value >= threshold, round to 1.0, otherwise round to 0.0
image_data = np.where(image_data >= threshold, 1.0, 0.0)

# Since it has a single channel, we need to remove the last dimension
image_data = image_data.squeeze()

# Plot the image using a grayscale color map
plt.imshow(image_data, cmap='gray')  # Use 'gray' colormap for grayscale images
plt.axis('off')  # Hide axis ticks and labels
plt.show()  # Display the image

In [None]:
# Convert the data type of the mask image
mask_image = image_data.astype(np.uint8)
mask_image.shape

In [None]:
# Remove the batch information
base_image = tf.squeeze(base_image, axis=0).numpy()
base_image.shape

In [None]:
# Ensure the base image and mask are the same size
assert base_image.shape[:2] == mask_image.shape, "The base image and mask must have the same dimensions."

# Create a red overlay to apply where the mask is white
red_overlay = np.zeros_like(base_image)
red_overlay[:, :, 2] = 255  # Set the red channel to maximum (255)

# Apply the mask to the red overlay to keep only the red in the white regions of the mask
red_masked = cv2.bitwise_and(red_overlay, red_overlay, mask=mask_image)

# Combine the base image and the red-masked overlay
result_image = cv2.addWeighted(base_image, 1.0, red_masked, 1.0, 0)

# Original size: (1080, 1920, 3)
result_image = cv2.resize(result_image, (1080, 1920), interpolation=cv2.INTER_AREA)
plt.imshow(cv2.cvtColor(result_image, cv2.COLOR_BGR2RGB))
plt.axis('off')  # Hide axis ticks and labels
plt.show()
