In [None]:
!pip install albumentations segmentation-models-pytorch
!pip install torch torchvision

In [None]:
import os
import json
import cv2
import numpy as np
from matplotlib import pyplot as plt
import albumentations as A
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import segmentation_models_pytorch as smp
import time

In [None]:
images_path = '/home/vivekp22/Documents/Images/'
polygons_path = '/home/vivekp22/Documents/Polygon/'

output_path = 'output_masks/'
os.makedirs(output_path, exist_ok=True)

In [None]:
transform = A.Compose([
    A.RandomRotate90(),
    A.Flip(),
    A.RandomBrightnessContrast(),
    A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225))
])

In [None]:
model = smp.Unet(
    encoder_name="resnet34",  # Choose an encoder
    encoder_weights="imagenet",  # Use pre-trained weights
    in_channels=3,  # Input channels
    classes=1  # Number of output classes
)

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)
model.eval()  # Set model to evaluation mode

In [None]:
import json

def extract_road_labels(json_file):
    """
    Extracts the "Road" labels from the given JSON file.

    Args:
    json_file (str): Path to the JSON file.

    Returns:
    dict: A dictionary containing the "Road" labels.
    """
    try:
        with open(json_file, 'r') as file:
            data = json.load(file)
    except FileNotFoundError:
        print(f"File {json_file} not found.")
        return []
    except json.JSONDecodeError:
        print(f"Failed to parse JSON file {json_file}.")
        return []

    road_labels = []
    for annotation in data.get("annotations", []):
        if annotation.get("category_id") == 1 and annotation.get("category_id") is not None:
            road_label = {
                'image_id': annotation.get("image_id"),
                'category_id': annotation.get("category_id"),
                'segmentation': annotation.get("segmentation"),
                'area': annotation.get("area"),
                'bbox': annotation.get("bbox"),
            }
            road_labels.append(road_label)

    return road_labels

In [None]:
import json
import numpy as np

def create_road_mask(json_file):
    """
    Creates a road mask based on the annotations in the given JSON file.

    Args:
    json_file (str): Path to the JSON file.

    Returns:
    list: A list of road masks, where each mask is a 2D numpy array.
    """
    try:
        with open(json_file, 'r') as file:
            data = json.load(file)
    except FileNotFoundError:
        print(f"File {json_file} not found.")
        return []
    except json.JSONDecodeError:
        print(f"Failed to parse JSON file {json_file}.")
        return []

    road_masks = []
    for image in data.get("images", []):
        image_id = image.get("id")
        width = image.get("width")
        height = image.get("height")

        road_mask = np.zeros((height, width), dtype=np.uint8)
        for annotation in data.get("annotations", []):
            if annotation.get("image_id") == image_id and annotation.get("category_id") == 1:
                segmentation = np.array(annotation.get("segmentation")[0]).reshape((-1, 2)).astype(np.int32)

                for i, x_y in enumerate(segmentation):
                    cv_x, cv_y = x_y
                    if 0 <= cv_x < width and 0 <= cv_y < height:
                        road_mask[cv_y, cv_x] = 255
                    for next_i in ((i + 1) % len(segmentation), (i + len(segmentation) - 1) % len(segmentation)):
                        next_x_y = segmentation[next_i]
                        next_cv_x, next_cv_y = next_x_y
                        if 0 <= next_cv_x < width and 0 <= next_cv_y < height and 0 <= cv_x < width and 0 <= cv_y < height:
                            cv2.line(road_mask, (cv_x, cv_y), (next_cv_x, next_cv_y), 255, 1)

        road_masks.append(road_mask)

    return road_masks

In [None]:
# Process Images
for image_name in os.listdir(images_path):
    if image_name.endswith('.jpg'):
        start_time = time.time()
        json_file = os.path.join(polygons_path, image_name.replace('.png', '.json'))

        if os.path.exists(json_file):
            polygons = extract_road_labels(json_file)
            image_path = os.path.join(images_path, image_name)
            image = cv2.imread(image_path)
            image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

            # Pad the image to ensure dimensions are multiples of 32
            height, width, _ = image.shape
            new_height = (height // 32 + 1) * 32 if height % 32 != 0 else height
            new_width = (width // 32 + 1) * 32 if width % 32 != 0 else width
            image = cv2.copyMakeBorder(image, 0, new_height - height, 0, new_width - width, cv2.BORDER_CONSTANT, value=[0, 0, 0]) # Pad with black pixels

            # Apply transformations
            transformed = transform(image=image)
            image_transformed = transformed["image"]

            # Add batch dimension and move to device
            #image_transformed = image_transformed.unsqueeze(0).to(device)
            image_transformed = torch.from_numpy(image_transformed).permute(2, 0, 1).unsqueeze(0).to(device)

            '''
            "Use the below part, if program is running through GPU."

            Prediction
            with torch.no_grad():
                prediction = model(image_transformed)

            Remove batch dimension and move to CPU
            prediction = prediction.squeeze(0).cpu().numpy()
            '''

            '''
            Comment the below line, if program is running through GPU.
            '''

            prediction = np.zeros((height, width, 3), dtype=np.uint8)

            # Process prediction (e.g., thresholding, argmax for multi-class)
            # Get the number of output channels from the model definition
            num_classes = 1
            if num_classes > 1:
                # For multi-class segmentation, get class with highest probability
                predicted_mask = np.argmax(prediction, axis=0)
            else:
                # For binary segmentation, threshold the prediction
                predicted_mask = (prediction > 0.5).astype(np.uint8)

            # Visualization and saving
            predicted_mask = np.zeros((new_height, new_width, 3), dtype=np.uint8) # Update mask dimensions

            for obj in polygons:
                polygon = np.array(obj['polygon'], np.int32)
                polygon = polygon.reshape((-1, 1, 2))
                if any(item in obj['label'] for item in ('Road', 'sidewalk', 'driveway', 'crosswalk', 'pedestrian-area')):
                    color = (255, 255, 0) # Deep Yellow for road surfaces
                elif 'snow' in obj['label']:
                    color = (255, 255, 255) # White for snow
                else:
                    color = (0, 0, 0) # Black for other objects
                cv2.fillPoly(predicted_mask, [polygon], color)

            end_time = time.time()
            processing_time = end_time - start_time

            # Create a new mask for only road surfaces
            road_only_mask = np.zeros((new_height, new_width, 3), dtype=np.uint8) # Update mask dimensions

            for obj in polygons:
                polygon = np.array(obj['polygon'], np.int32)
                polygon = polygon.reshape((-1, 1, 2))
                if any(item in obj['label'] for item in ('Road', 'sidewalk', 'driveway', 'crosswalk', 'pedestrian-area')):
                    color = (255, 255, 0) # Deep Yellow for road surfaces
                    cv2.fillPoly(road_only_mask, [polygon], color)
                else:
                  pass

            # Save the road-only mask image
            road_masks_path = os.path.join(output_path, 'road_masks')
            if not os.path.exists(road_masks_path):
                os.makedirs(road_masks_path)
            cv2.imwrite(os.path.join(road_masks_path, f"{image_name.replace('.png', '_road_only_mask.png')}"), road_only_mask)

            # Plotting and visualization (can be removed)
            plt.figure(figsize=(18, 6))
            plt.subplot(1, 3, 1)
            plt.imshow(image)
            plt.title(f'Original Image: {image_name}')
            plt.axis('off')
            plt.subplot(1, 3, 2)
            plt.imshow(predicted_mask)
            plt.title(f'Predicted Mask\n(Processing Time: {processing_time:.2f}s)')
            plt.axis('off')
            plt.subplot(1, 3, 3)
            plt.imshow(road_only_mask)
            plt.title('Road Surfaces Only')
            plt.axis('off')
            plt.savefig(os.path.join(output_path, image_name.replace('.png', '_mask.png')))
            plt.show()
            plt.close()


In [None]:
import cv2
import numpy as np
import json
import matplotlib.pyplot as plt
import os

def create_road_mask(image_name, json_file):
    # Try to read the image
    image_path = f"Images/{image_name}"
    image = cv2.imread(image_path)

    # Check if image is loaded successfully
    if image is None:
        print(f"Error: Unable to open image file {image_path}")
        return None

    height, width, _ = image.shape
    road_mask = np.zeros((height, width), dtype=np.uint8)

    # Load JSON file
    try:
        with open(json_file, 'r') as file:
            json_data = json.load(file)
    except FileNotFoundError:
        print(f"File {json_file} not found.")
        return None
    except json.JSONDecodeError:
        print(f"Failed to parse JSON file {json_file}.")
        return None

    # Iterate over all annotations in the JSON file
    for annotation in json_data.get("annotations", []):
        # Check if annotation belongs to the "Road" category
        if annotation.get("category_id") == 1:
            # Load segmentation data
            segmentation = np.array(annotation.get("segmentation")[0]).reshape((-1, 2)).astype(np.int32)
            # Create a polygon from the segmentation data
            polygon = segmentation.reshape((-1, 1, 2)).astype(np.int32)
            # Draw the polygon on the road mask
            cv2.fillPoly(road_mask, [polygon], 255)

    return road_mask

# Iterate over all JSON files in the directory
for json_file in os.listdir('Polygon/'):
    if json_file.endswith('.json'):
        # Find the corresponding image file
        image_name = json_file.replace('.json', '.png')

        # Create the road mask
        road_mask = create_road_mask(image_name, f"Polygon/{json_file}")

        if road_mask is not None:
            # Save the mask image
            cv2.imwrite(f"Masks/{image_name}", road_mask)

            # Display the mask image using matplotlib
            plt.imshow(road_mask, cmap='gray')
            plt.title(f'Mask for {image_name}')
            plt.show()

In [None]:
import cv2
import numpy as np
import json
import matplotlib.pyplot as plt
import os

def create_road_mask(image_name, json_file):
    # Try to read the image
    image_path = f"Images/{image_name}"
    image = cv2.imread(image_path)

    # Check if image is loaded successfully
    if image is None:
        print(f"Error: Unable to open image file {image_path}")
        return None

    height, width, _ = image.shape
    road_mask = np.zeros((height, width), dtype=np.uint8)

    # Load JSON file
    try:
        with open(json_file, 'r') as file:
            json_data = json.load(file)
    except FileNotFoundError:
        print(f"File {json_file} not found.")
        return None
    except json.JSONDecodeError:
        print(f"Failed to parse JSON file {json_file}.")
        return None

    # Iterate over all annotations in the JSON file
    for annotation in json_data.get("annotations", []):
        # Check if annotation belongs to the "Road" category
        if annotation.get("category_id") == 1:
            # Load segmentation data
            segmentation = np.array(annotation.get("segmentation")[0]).reshape((-1, 2)).astype(np.int32)
            # Create a polygon from the segmentation data
            polygon = segmentation.reshape((-1, 1, 2)).astype(np.int32)
            # Draw the polygon on the road mask
            cv2.fillPoly(road_mask, [polygon], 255)

    return road_mask

# Create the output directory if it does not exist
output_dir = 'output_masks'
if not os.path.exists(output_dir):
    os.makedirs(output_dir)

# Iterate over all JSON files in the directory
for json_file in os.listdir('Polygon/'):
    if json_file.endswith('.json'):
        # Find the corresponding image file
        image_name = json_file.replace('.json', '.png')

        # Create the road mask
        road_mask = create_road_mask(image_name, os.path.join('Polygon', json_file))

        if road_mask is not None:
            # Save the mask image
            cv2.imwrite(os.path.join(output_dir, image_name), road_mask)

            # Display the mask image using matplotlib
            plt.imshow(road_mask, cmap='gray')
            plt.title(f'Mask for {image_name}')
            plt.show()

In [None]:
import cv2
import numpy as np
import os

# Define the path to the road_masks directory
road_masks_path = 'output_masks'

# Prompt the user to enter the directory path of the original images
original_images_path = 'Images'

# Create a new directory to save the processed images
processed_images_path = 'processed_images'
if not os.path.exists(processed_images_path):
    os.makedirs(processed_images_path)

# Iterate over the images in the road_masks directory
for filename in os.listdir(road_masks_path):
    # Load the masked image
    masked_image_path = os.path.join(road_masks_path, filename)
    masked_image = cv2.imread(masked_image_path)

    # Convert the masked image to a binary mask by thresholding the white color
    gray_masked_image = cv2.cvtColor(masked_image, cv2.COLOR_BGR2GRAY)
    binary_mask = cv2.threshold(gray_masked_image, 128, 255, cv2.THRESH_BINARY)[1]

    # Find the corresponding original image
    original_image_filename = filename.replace('_road_only_mask.png', '.jpg')
    for file in os.listdir(original_images_path):
        if file == original_image_filename:
            original_image_path = os.path.join(original_images_path, file)
            original_image = cv2.imread(original_image_path)

            # Resize the binary mask to the same size as the original image
            binary_mask = cv2.resize(binary_mask, (original_image.shape[1], original_image.shape[0]))

            # Create a black image with the same shape as the original image
            black_image = np.zeros_like(original_image)

            # Perform a bitwise AND operation between the original image and the binary mask
            result = cv2.bitwise_and(original_image, original_image, mask=binary_mask)

            # Create a new image with the road surface in natural form and everything else in black
            new_image = result | black_image

            # Save the new image in the processed_images directory
            cv2.imwrite(os.path.join(processed_images_path, file), new_image)
            print(f"Image processed and saved for {filename}")
            break
    else:
        print(f"Original image not found for {filename}")




In [None]:
plt.figure(figsize=(18, 6))

plt.subplot(1, 3, 1)
plt.imshow(image)
plt.title(f'Original Image: {image_name}')
plt.axis('off')

plt.subplot(1, 3, 2)
plt.imshow(predicted_mask)
plt.title(f'Predicted Mask\n(Processing Time: {processing_time:.2f}s)')
plt.axis('off')

road_only_mask = np.zeros((new_height, new_width, 3), dtype=np.uint8) # Update mask dimensions

for obj in polygons:
    polygon = np.array(obj['segmentation'][0]).reshape((-1, 2)).astype(np.int32)
    polygon = polygon.reshape((-1, 1, 2))
    color = (255, 255, 0) # Deep Yellow for road surfaces
    cv2.fillPoly(road_only_mask, [polygon], color)

plt.subplot(1, 3, 3)  # Subplot for road-only mask
plt.imshow(road_only_mask)  # Display the road-only mask
plt.title('Road Surfaces Only')
plt.axis('off')

plt.savefig(os.path.join(output_path, image_name.replace('.jpg', '_mask.png')))
plt.show()

plt.close()