## Dependencies

In [None]:
!pip install pycocotools

In [None]:
import time
import os
import argparse
import cv2

import pycocotools.coco as pyco

from PIL import Image
import matplotlib.pyplot as plt
import matplotlib.patches as patches

import torch
from torchvision.transforms import Compose, ToTensor, Resize
from torchvision.models.detection.faster_rcnn import FasterRCNN
from torchvision.models import resnet18, resnet50, resnet101,\
    ResNet101_Weights, ResNet18_Weights, ResNet50_Weights
from torchvision.models.detection.anchor_utils import AnchorGenerator
from torch import nn
from torch.utils.data import DataLoader, Dataset
from torch.optim import Adam
from torch.optim.lr_scheduler import StepLR


## Dataset Class

In [None]:
class SDSDataset(Dataset):
    def __init__(self, root, annotation_file, resize):
        self.root = root
        self.coco = pyco.COCO(annotation_file)
        self.ids = list(self.coco.imgs.keys())
        self.num_classes = len(self.coco.cats)
        self.resize = resize
        self.transform = Compose([
            Resize(resize),
            ToTensor()
            ])

    def __len__(self):
        return len(self.ids)

    def __getitem__(self, index):
        coco = self.coco

        # Image ID of the input image
        img_id = self.ids[index]
        # Annotation IDs from coco
        ann_ids = coco.getAnnIds(img_id)
        # Load Annotation for the input image
        coco_annotation = coco.loadAnns(ann_ids)
        # Get path for the input image
        path = coco.loadImgs(img_id)[0]['file_name']

        # Open input image
        org_image = Image.open(os.path.join(self.root, path))

        # Get size of input image
        org_height = org_image.height
        org_width = org_image.width

        # Apply transformation (resize) to input image
        image = self.transform(org_image)

        # Get number of objects in the input image
        num_objects = len(coco_annotation)

        # Get bounding boxes and category labels
        # Coco format: bbox = [xmin, ymin, width, height]
        # Pytorch format: bbox = [xmin, ymin, xmax, ymax]
        boxes = []
        labels = []
        for i in range(num_objects):
            # Convert and resize boxes
            xmin = coco_annotation[i]['bbox'][0] / (org_width/self.resize[1])
            ymin = coco_annotation[i]['bbox'][1] / (org_height/self.resize[0])
            xmax = xmin + coco_annotation[i]['bbox'][2] / (org_width/self.resize[1])
            ymax = ymin + coco_annotation[i]['bbox'][3] / (org_height/self.resize[0])
            labels.append(coco_annotation[i]['category_id'])
            boxes.append([xmin, ymin, xmax, ymax])

        # Convert to tensor
        boxes = torch.as_tensor(boxes, dtype=torch.float32)
        labels = torch.as_tensor(labels, dtype=torch.int64)
        img_id = torch.tensor([img_id])

        # Get (rectangular) size of bbox
        areas = []
        for i in range(num_objects):
            areas.append(coco_annotation[i]['area'])
        areas = torch.as_tensor(areas, dtype=torch.float32)

        # Get Iscrowd
        iscrowd = torch.zeros((num_objects,), dtype=torch.int64)

        # Create annotation dictionary
        annotation = dict()
        annotation['boxes'] = boxes
        annotation['labels'] = labels
        annotation['image_id'] = img_id
        annotation['area'] = areas
        annotation['iscrowd'] = iscrowd

        # Save width and height of the original image to rescale bounding boxes later on
        annotation['org_h'] = torch.as_tensor(org_height, dtype=torch.int64)
        annotation['org_w'] = torch.as_tensor(org_width, dtype=torch.int64)

        return image, annotation

## Reading Data

In [None]:
# Images
train_data_dir = '/kaggle/input/sds-dataset/compressed/images/train'
test_data_dir = '/kaggle/input/sds-dataset/compressed/images/val'
# Annotations
train_annotation_dir = '/kaggle/input/sds-dataset/compressed/annotations/instances_train.json'
test_annotation_dir = '/kaggle/input/sds-dataset/compressed/annotations/instances_val.json'

In [None]:
# Check if Cuda is available
print(f'Cuda available: {torch.cuda.is_available()}')
if torch.cuda.is_available():
    # If yes, use GPU
    device = torch.device('cuda')
else:
    # If no, use CPU
    device = torch.device('cpu')

In [None]:
resize = (256, 256)
batch_size = 32

In [None]:
# Create Datasets
train_dataset = SDSDataset(train_data_dir, train_annotation_dir, resize)
test_dataset = SDSDataset(test_data_dir, test_annotation_dir, resize)

In [None]:
# Create Dataloader
def collate_fn(batch):
    return tuple(zip(*batch))

data_loader_train = DataLoader(train_dataset,
                               batch_size=batch_size,
                               shuffle=True,
                               collate_fn=collate_fn)
data_loader_test = DataLoader(test_dataset,
                              batch_size=batch_size,
                              shuffle=True,
                              collate_fn=collate_fn)

## Visualizing Image and Annotations

In [None]:
image, annotations = train_dataset[1000]
# Convert the image tensor to NumPy and permute it for visualization
image_np = image.permute(1, 2, 0).numpy()

fig, ax = plt.subplots(1)
ax.imshow(image_np)

# Get bounding boxes and labels from annotations
boxes = annotations['boxes'].numpy()
labels = annotations['labels'].numpy()

# Loop through each bounding box and draw it
for i, box in enumerate(boxes):
    x1, y1, x2, y2 = box
    rect = patches.Rectangle((x1, y1), x2-x1, y2-y1, linewidth=1, edgecolor='r', facecolor='none')
    ax.add_patch(rect)
    plt.text(x1, y1, str(labels[i]), color='white')  # Optional: Add label text

plt.axis('off')
plt.show()

In [None]:
annotations

# **Filtering**

In [None]:
def apply_filters_to_selected_images(folder_path, num_images=5):
    image_files = [f for f in os.listdir(folder_path) if f.endswith(('jpg', 'png', 'jpeg'))][:num_images]
    
    for image_file in image_files:
        image_path = os.path.join(folder_path, image_file)
        
        # Load image
        image = cv2.imread(image_path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        
        # Apply filters
        gaussian_filtered = cv2.GaussianBlur(image, (5, 5), 0)
        median_filtered = cv2.medianBlur(image, 5)
        bilateral_filtered = cv2.bilateralFilter(image, 9, 75, 75)
        
        # Display results
        fig, axs = plt.subplots(2, 2, figsize=(10, 10))
        axs[0, 0].imshow(image)
        axs[0, 0].set_title(f"Original Image: {image_file}")
        axs[0, 1].imshow(gaussian_filtered)
        axs[0, 1].set_title("Gaussian Filter")
        axs[1, 0].imshow(median_filtered)
        axs[1, 0].set_title("Median Filter")
        axs[1, 1].imshow(bilateral_filtered)
        axs[1, 1].set_title("Bilateral Filter")
        for ax in axs.ravel(): ax.axis("off")
        plt.show()

In [None]:
def segment_and_evaluate(image_path):
    image = cv2.imread(image_path)
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    gray = rgb2gray(image)
    
    # K-Means Clustering
    reshaped_image = image.reshape((-1, 3))
    kmeans = KMeans(n_clusters=3, random_state=42, n_init=10).fit(reshaped_image)
    kmeans_segmented = kmeans.labels_.reshape(image.shape[:2])
    
    # Mean Shift Segmentation
    mean_shift_segmented = sobel(gray)
    
    # Graph-Based Segmentation
    graph_segmented = felzenszwalb(image, scale=100, sigma=0.5, min_size=50)
    
    # Boundary detection
    edge_kmeans = canny(kmeans_segmented.astype(float))
    edge_meanshift = canny(mean_shift_segmented)
    edge_graph = canny(graph_segmented.astype(float))
    
    # Compute Jaccard Score for accuracy comparison
    gt_edges = dilation(canny(gray), disk(1))
    
    kmeans_acc = jaccard_score(gt_edges.flatten(), edge_kmeans.flatten(), average='binary')
    meanshift_acc = jaccard_score(gt_edges.flatten(), edge_meanshift.flatten(), average='binary')
    graph_acc = jaccard_score(gt_edges.flatten(), edge_graph.flatten(), average='binary')
    
    # Display results
    fig, axs = plt.subplots(2, 3, figsize=(15, 10))
    axs[0, 0].imshow(image)
    axs[0, 0].set_title("Original Image")
    axs[0, 1].imshow(kmeans_segmented, cmap='gray')
    axs[0, 1].set_title(f"K-Means (Acc: {kmeans_acc:.3f})")
    axs[0, 2].imshow(mean_shift_segmented, cmap='gray')
    axs[0, 2].set_title(f"Mean Shift (Acc: {meanshift_acc:.3f})")
    axs[1, 0].imshow(graph_segmented, cmap='gray')
    axs[1, 0].set_title(f"Graph-Based (Acc: {graph_acc:.3f})")
    axs[1, 1].imshow(gt_edges, cmap='gray')
    axs[1, 1].set_title("Ground Truth Edges")
    for ax in axs.ravel(): ax.axis("off")
    plt.show()
    
    return kmeans_acc, meanshift_acc, graph_acc

In [None]:
image_files = [os.path.join(train_data_dir, f) for f in os.listdir(train_data_dir) if f.endswith(('jpg', 'png', 'jpeg'))][:5]
for image_file in image_files:
    kmeans_acc, meanshift_acc, graph_acc = segment_and_evaluate(image_file)
    print(f"Image: {os.path.basename(image_file)}\nK-Means Accuracy: {kmeans_acc:.3f}, Mean Shift Accuracy: {meanshift_acc:.3f}, Graph-Based Accuracy: {graph_acc:.3f}\n")

# Region Growing Algorithm

In [None]:
# Load Faster R-CNN Model
faster_rcnn = fasterrcnn_resnet50_fpn(pretrained=True)
faster_rcnn.eval().to(device)

# Load Mask R-CNN Model
mask_rcnn = maskrcnn_resnet50_fpn(pretrained=True)
mask_rcnn.eval().to(device)

In [None]:
# Region Growing Algorithm Implementation
def region_growing(img, seed):
    rows, cols = img.shape
    segmented = np.zeros_like(img, dtype=np.uint8)
    visited = np.zeros_like(img, dtype=np.bool_)
    threshold = 10  # Intensity difference threshold
    stack = [seed]

    while stack:
        x, y = stack.pop()
        if visited[x, y]:
            continue

        visited[x, y] = True
        segmented[x, y] = 255

        for dx, dy in [(-1, 0), (1, 0), (0, -1), (0, 1)]:
            nx, ny = x + dx, y + dy
            if 0 <= nx < rows and 0 <= ny < cols and not visited[nx, ny]:
                if abs(int(img[nx, ny]) - int(img[x, y])) < threshold:
                    stack.append((nx, ny))

    return segmented

# Apply Region Growing with a seed point near the detected object
seed_point = (200, 300)  # Example seed point, should be adapted based on input image
region_growing_result = region_growing(gray, seed_point)

with torch.no_grad():
    detections = faster_rcnn([image_tensor])

# Draw bounding boxes
threshold = 0.5
image_with_boxes = image_rgb.copy()
for i, score in enumerate(detections[0]['scores']):
    if score > threshold:
        box = detections[0]['boxes'][i].cpu().numpy().astype(int)
        cv2.rectangle(image_with_boxes, (box[0], box[1]), (box[2], box[3]), (255, 0, 0), 2)
        
# Display the region-growing result
plt.figure(figsize=(6, 6))
plt.title("Region Growing Segmentation")
plt.imshow(region_growing_result, cmap="gray")
plt.axis("off")
plt.show()

# Connected Component Analysis

In [None]:
# Function to process the image in smaller patches to reduce memory load
def process_in_patches(image, patch_size=256):
    h, w = image.shape[:2]
    processed_image = np.zeros_like(image, dtype=np.uint8)

    for y in range(0, h, patch_size):
        for x in range(0, w, patch_size):
            # Extract patch
            patch = image[y:y+patch_size, x:x+patch_size]

            # Apply contour-based filtering on patch
            contours, _ = cv2.findContours(patch, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
            patch_mask = np.zeros_like(patch, dtype=np.uint8)

            for contour in contours:
                if cv2.contourArea(contour) > 500:  # Minimum threshold for objects
                    cv2.drawContours(patch_mask, [contour], -1, 255, thickness=cv2.FILLED)

            # Place processed patch back into full image
            processed_image[y:y+patch_size, x:x+patch_size] = patch_mask

    return processed_image

# Process the region growing result in patches
filtered_result_patches = process_in_patches(region_growing_result)

# Display the final refined segmentation
plt.figure(figsize=(8, 8))
plt.title("Refined Object Separation using Patch-based Processing")
plt.imshow(filtered_result_patches, cmap="gray")
plt.axis("off")
plt.show()

## Training model

In [None]:
modules = list(resnet18(weights=ResNet18_Weights.DEFAULT).children())[:-2]
backbone = nn.Sequential(*modules)
backbone.out_channels = 512

In [None]:
# Create Anchor Generator
anchor_generator = AnchorGenerator(sizes=((8, 16, 32, 64, 128, 256, 512),),
                                   aspect_ratios=((0.5, 1.0, 2.0),))

In [None]:
# Initialize FasterRCNN with Backbone and AnchorGenerator
model = FasterRCNN(backbone=backbone,
                   rpn_anchor_generator=anchor_generator,
                   num_classes=train_dataset.num_classes)
# Send model to device
model.to(device)

print('')

In [None]:
learning_rate = 0.0001
num_epochs = 100
early_stopping_tolerance = 10
early_stopping_counter = 0

In [None]:
# Define learning rate, optimizer and scheduler
params = [p for p in model.parameters() if p.requires_grad]
optimizer = Adam(params, lr=learning_rate)
lr_scheduler = StepLR(optimizer, step_size=1, gamma=0.9)

In [None]:
# Start Training Process
model.train()
for epoch in range(num_epochs):
    # Training
    for images, targets in data_loader_train:
        images = list(image.to(device) for image in images)
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

        loss_dict = model(images, targets)
        sum_loss = sum(loss for loss in loss_dict.values())

        sum_loss.backward()
        optimizer.step()
        optimizer.zero_grad()

    # Evaluation
    with torch.no_grad():
        for images, targets in data_loader_test:
            images = list(image.to(device) for image in images)
            targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

            loss_dict = model(images, targets)
            average_loss = sum(loss for loss in loss_dict.values()) / batch_size

    if epoch == 0:
        best_average_loss = average_loss

    # If model improved, save weights
    if best_average_loss >= average_loss:
        best_average_loss = average_loss
        early_stopping_counter = 0
        torch.save(
            model.state_dict(),
            'model.pth'
        )

    # Otherwise, reduce learning rate
    else:
        early_stopping_counter += 1
        lr_scheduler.step()

    time_id = time.strftime('%Y_%m_%d-%H_%M')
    print(f'[{time_id}] '
          f'Epoch {epoch} of {num_epochs} - Loss: {average_loss} - LR: {str(lr_scheduler.get_last_lr()[0])} '
          f'- Early Stopping: {early_stopping_counter}/{early_stopping_tolerance}')

    if early_stopping_tolerance == early_stopping_counter:
        break

print('Training stopped')

# if args.create_prediction_file:
#     # Create prediction file in coco format:
#     generate_prediction_file(model, data_loader_test, device, resize)