In [20]:
import torch
import torchvision
from torch.utils.data import DataLoader,Dataset
from torchvision.models.detection import FasterRCNN
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from torchvision.datasets import CocoDetection
from torchvision.transforms import functional as F
import matplotlib.pyplot as plt
from PIL import Image
import os

In [23]:
class CustomDataset(Dataset):
    def __init__(self, image_dir, label_dir, transforms=None):
        self.image_dir = image_dir
        self.label_dir = label_dir
        self.transforms = transforms
        self.images = os.listdir(image_dir)

    def __len__(self):
        return len(self.images)

    def parse_label(self, label_path):
        boxes = []
        labels = []
        with open(label_path, "r") as f:
            for line in f.readlines():
                label, x_min, y_min, x_max, y_max = map(float, line.strip().split())
                boxes.append([x_min, y_min, x_max, y_max])
                labels.append(int(label))
        return boxes, labels

    def __getitem__(self, idx):
        # Load image
        img_path = os.path.join(self.image_dir, self.images[idx])
        img = Image.open(img_path)

        # Load label
        label_path = os.path.join(
            self.label_dir, os.path.splitext(self.images[idx])[0] + ".txt"
        )
        boxes, labels = self.parse_label(label_path)

        # Convert to tensor
        boxes = torch.as_tensor(boxes, dtype=torch.float32)
        labels = torch.as_tensor(labels, dtype=torch.int64)

        # Target dictionary
        target = {"boxes": boxes, "labels": labels}

        # Apply transforms
        if self.transforms:
            img = self.transforms(img)

        return img, target


In [24]:
# Define transforms
def get_transforms():
    return torchvision.transforms.Compose([torchvision.transforms.ToTensor()])


# Load dataset
train_dataset = CustomDataset(
    image_dir=r"rcnn_train\dataset_rcnn\images\train",
    label_dir=r"rcnn_train\dataset_rcnn\labels\train",
    transforms=get_transforms(),
)
val_dataset = CustomDataset(
    image_dir=r"rcnn_train\dataset_rcnn\images\val",
    label_dir=r"rcnn_train\dataset_rcnn\labels\val",
    transforms=get_transforms(),
)

In [25]:
train_loader = DataLoader(
    train_dataset,
    batch_size=4,
    shuffle=True,
    collate_fn=lambda x: tuple(zip(*x)),
    pin_memory=True,
)
val_loader = DataLoader(
    val_dataset,
    batch_size=4,
    shuffle=True,
    collate_fn=lambda x: tuple(zip(*x)),
    pin_memory=True,
)

In [39]:
# Load Faster R-CNN with ResNet-50 backbone
def get_model(num_classes,checkpoint_path):
    # Load pre-trained Faster R-CNN
    # model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True)
    model = torchvision.models.detection.fasterrcnn_mobilenet_v3_large_fpn(pretrained=True)
    
    # Get the number of input features for the classifier
    in_features = model.roi_heads.box_predictor.cls_score.in_features
    
    # Replace the pre-trained head with a new one for the specified number of classes
    model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)
    
    # Optionally load weights from a checkpoint
    if checkpoint_path:
        state_dict = torch.load(checkpoint_path)
        model.load_state_dict(state_dict)
        print(f"Model weights loaded from {checkpoint_path}")
    
    return model

In [None]:
# Initialize the model
num_classes = 3 # Background + rocks,shadow
checkpoint_path=''
# checkpoint_path=r'rcnn_train\models\fasterrcnn_resnet50_epoch_5.pth'
model = get_model(num_classes,checkpoint_path)

In [41]:
# Move model to GPU if available
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
model.to(device)

# Define optimizer and learning rate scheduler
params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(params, lr=0.002, momentum=0.9, weight_decay=0.0002)
lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=2, gamma=0.1)


In [42]:
import time

def train_one_epoch(model, optimizer, data_loader, device, epoch):
    model.train()

    print(f"Starting training for Epoch {epoch}...")
    start_time = time.time()
    total_batches = len(data_loader)

    # Initialize variables to track time
    batch_times = []

    for batch_idx, (images, targets) in enumerate(data_loader):
        batch_start_time = time.time()
        
        # Move images to the device
        images = [img.to(device) for img in images]

        # Validate and process targets
        processed_targets = []
        valid_images = []

        for i, target in enumerate(targets):
            # Extract bounding boxes and labels
            boxes = target["boxes"]  # Already in [x_min, y_min, x_max, y_max] format
            labels = target["labels"]

            # Filter valid boxes where width and height are positive
            valid_boxes = []
            valid_labels = []

            for box, label in zip(boxes, labels):
                x_min, y_min, x_max, y_max = box
                if x_max > x_min and y_max > y_min:  # Ensure positive width and height
                    valid_boxes.append([x_min, y_min, x_max, y_max])
                    valid_labels.append(label)

            # Only add valid boxes and labels to processed targets
            if valid_boxes:
                processed_target = {
                    "boxes": torch.tensor(valid_boxes, dtype=torch.float32).to(device),
                    "labels": torch.tensor(valid_labels, dtype=torch.int64).to(device),
                }
                processed_targets.append(processed_target)
                valid_images.append(images[i])  # Add corresponding valid image

        # Skip iteration if no valid targets
        if not processed_targets:
            continue

        # Ensure images and targets are aligned
        images = valid_images

        # Forward pass
        loss_dict = model(images, processed_targets)
        losses = sum(loss for loss in loss_dict.values())

        # Backpropagation
        optimizer.zero_grad()
        losses.backward()
        optimizer.step()

        # Record batch time
        batch_time = time.time() - batch_start_time
        batch_times.append(batch_time)

        # Estimate time left
        avg_batch_time = sum(batch_times) / len(batch_times)
        batches_left = total_batches - (batch_idx + 1)
        time_left = avg_batch_time * batches_left

        # Log information
        print(
            f"Epoch [{epoch}] | Batch [{batch_idx + 1}/{total_batches}] | "
            f"Loss: {losses.item():.4f} | Batch Time: {batch_time:.2f}s | Time Left: {time_left:.2f}s"
        )

    total_time = time.time() - start_time
    print(f"Epoch [{epoch}] completed in {total_time:.2f}s with final Loss: {losses.item():.4f}")


In [None]:
# Training loop
num_epochs = 5
model_dir='rcnn_train\models'
os.makedirs(model_dir,exist_ok=True)

for epoch in range(num_epochs):
    print(epoch)
    train_one_epoch(model, optimizer, train_loader, device, epoch)
    lr_scheduler.step()
    
    # Save the model's state dictionary after every epoch
    model_path = f"rcnn_train\models\fasterrcnn_resnet50_epoch_{epoch +1}.pth"
    torch.save(model.state_dict(), model_path)
    print(f"Model saved: {model_path}")


Output

In [None]:
import torch
import torchvision
from torch.utils.data import DataLoader
from torchvision.models.detection import FasterRCNN
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from torchvision.datasets import CocoDetection
from torchvision.transforms import functional as F
import matplotlib.pyplot as plt
from PIL import Image

# Load Faster R-CNN with ResNet-50 backbone
def get_model(num_classes):
    # Load pre-trained Faster R-CNN
    model = torchvision.models.detection.fasterrcnn_mobilenet_v3_large_fpn(pretrained=True)
    # Get the number of input features for the classifier
    in_features = model.roi_heads.box_predictor.cls_score.in_features
    # Replace the pre-trained head with a new one
    model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)
    return model


# Initialize the model
num_classes = 3  # Background + rock+shadow

# Move model to GPU if available
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')


# Load the trained model
model = get_model(num_classes)
model.load_state_dict(torch.load("rcnn_train\models\fasterrcnn_resnet50_epoch_5.pth"))
model.to(device)
model.eval()  # Set the model to evaluation mode


def prepare_image(image_path):
    image = Image.open(image_path).convert("RGB")  # Open image
    image_tensor = F.to_tensor(image).unsqueeze(0)  # Convert image to tensor and add batch dimension
    # image_tensor = image_tensor[0]  # Shape becomes [C, H, W]

    # # Permute dimensions to [H, W, C] for displaying
    # image_to_display = image_tensor.permute(1, 2, 0).numpy()

    # # Display the image using matplotlib
    # plt.imshow(image_to_display)
    # plt.axis('off')  # Hide axes for better visualization
    # plt.show()      
    return image_tensor.to(device)


# Load the unseen image
# image_path = r"C:\Users\Samarth\Desktop\polar3D\rcnn\dataset\images\train\01_A_off_30_R_0256.png"
image_path = ""
image_tensor = prepare_image(image_path)

with torch.no_grad():  # Disable gradient computation for inference
    prediction = model(image_tensor)
print(prediction)

# `prediction` contains:
# - boxes: predicted bounding boxes
# - labels: predicted class labels
# - scores: predicted scores for each box (confidence level)
COCO_CLASSES = {0: "background", 1: "rock", 2: "shadow"}

def get_class_name(class_id):
    return COCO_CLASSES.get(class_id, "Unknown")
    
# Draw bounding boxes with the correct class names and increase image size
def draw_boxes(image, prediction, fig_size=(10, 10)):
    boxes = prediction[0]['boxes'].cpu().numpy()  # Get predicted bounding boxes
    labels = prediction[0]['labels'].cpu().numpy()  # Get predicted labels
    scores = prediction[0]['scores'].cpu().numpy()  # Get predicted scores
    
    # Set a threshold for showing boxes (e.g., score > 0.5)
    threshold = 0.5
    
    # Set up the figure size to control the image size
    plt.figure(figsize=fig_size)  # Adjust the figure size here

    for box, label, score in zip(boxes, labels, scores):
        if score > threshold and label==1:
            x_min, y_min, x_max, y_max = box
            class_name = get_class_name(label)  # Get the class name
            plt.imshow(image)  # Display the image
            plt.gca().add_patch(plt.Rectangle((x_min, y_min), x_max - x_min, y_max - y_min, 
                                              linewidth=2, edgecolor='r', facecolor='none'))
            plt.text(x_min, y_min, f"{class_name} ({score:.2f})", color='r')
    
    plt.axis('off')  # Turn off axis
    plt.show()

# Display the image with bounding boxes and correct labels
draw_boxes(Image.open(image_path), prediction, fig_size=(12, 10))  # Example of increased size


In [None]:
import torch
import torchvision
from torch.utils.data import DataLoader
from torchvision.models.detection import FasterRCNN
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from torchvision.transforms import functional as F
import matplotlib.pyplot as plt
from PIL import Image
import random
import cv2
import os
import numpy as np

# Load Faster R-CNN with ResNet-50 backbone
def get_model(num_classes):
    # Load pre-trained Faster R-CNN
    model = torchvision.models.detection.fasterrcnn_mobilenet_v3_large_fpn(pretrained=True)
    # Get the number of input features for the classifier
    in_features = model.roi_heads.box_predictor.cls_score.in_features
    # Replace the pre-trained head with a new one
    model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)
    return model

# Initialize the model
num_classes = 3  # Background + rock + shadow

# Move model to GPU if available
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

# Load the trained model
model = get_model(num_classes)
model.load_state_dict(torch.load(r"C:\Users\Samarth\Desktop\polar3D\rcnn\models1\fasterrcnn_resnet50_epoch_19.pth"))
model.to(device)
model.eval()  # Set the model to evaluation mode

def prepare_image(image_path):
    image = Image.open(image_path).convert("RGB")  # Open image
    image_tensor = F.to_tensor(image).unsqueeze(0)  # Convert image to tensor and add batch dimension
    return image_tensor.to(device)

def process_random_images(img_dir, num_images=10):
    try:
        # Get all image files from the directory
        image_files = [f for f in os.listdir(img_dir) if f.lower().endswith(".png")]

        if len(image_files) < num_images:
            raise ValueError(
                f"Not enough images in the directory to select {num_images}. Found only {len(image_files)}."
            )

        # Select random images
        selected_images = random.sample(image_files, num_images)

        # Process each selected image
        processed_images = []
        for image_name in selected_images:
            image_path = os.path.join(img_dir, image_name)
            print(f"Processing image: {image_name}")
            processed_image = prepare_image(image_path)
            if processed_image is not None:
                processed_images.append((image_name, processed_image))
            else:
                print(f"Failed to process image: {image_name}")

        return processed_images

    except Exception as e:
        print(f"Error: {e}")
        return []

# Load and process unseen images
# image_dir = r"C:\Users\Samarth\Desktop\polar3D\rcnn\dataset\images\test"
# result_dir = r"C:\Users\Samarth\Desktop\polar3D\results\rcnn\images"
image_dir = ""
result_dir = ""
os.makedirs(result_dir, exist_ok=True)
processed_images = process_random_images(image_dir, num_images=10)

for image_name, image_tensor in processed_images:
    # Convert tensor to NumPy array for OpenCV
    image_np = image_tensor.squeeze(0).permute(1, 2, 0).cpu().numpy()
    image_np = (image_np * 255).astype(np.uint8)  # Rescale to [0, 255]
    
    with torch.no_grad():
        prediction = model(image_tensor)

    boxes = prediction[0]['boxes'].cpu().numpy()
    labels = prediction[0]['labels'].cpu().numpy()
    scores = prediction[0]['scores'].cpu().numpy()

    rock_count = 0
    label_name = image_name.replace("png", "txt")
    output_label_path = os.path.join(result_dir, label_name)

    with open(output_label_path, 'w') as file:
        for box, label, score in zip(boxes, labels, scores):
            x_min, y_min, x_max, y_max = box
            x1, y1 = int(x_min), int(y_min)
            x2, y2 = int(x_max), int(y_max)
            class_name = get_class_name(label)
            
            if score > 0.5:  # Only include confident predictions
                file.write(f"{class_name} {x_min:.6f} {y_min:.6f} {x_max:.6f} {y_max:.6f}\n")
                
                if label == 1:  # Count rocks
                    rock_count += 1
                    cv2.rectangle(image_np, (x1, y1), (x2, y2), (255, 0, 0), 2)
                    cv2.putText(
                        image_np,
                        class_name,
                        (x1, y1 - 10),
                        cv2.FONT_HERSHEY_SIMPLEX,
                        0.5,
                        (255, 0, 0),
                        2,
                    )

cv2.putText(
    image_np,
    f"Total Count: {rock_count}",
    (10, 30),
    cv2.FONT_HERSHEY_SIMPLEX,
    1,
    (255, 0, 0),
    2,
)

output_image_path = os.path.join(result_dir, image_name)
cv2.imwrite(output_image_path, image_np)

print(f"YOLO labels saved to: {output_label_path}")
print(f"Image with bounding boxes saved to: {output_image_path}")

In [None]:
import os
import random
import torch
import torchvision
from torch.utils.data import DataLoader
from torchvision.models.detection import FasterRCNN
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from torchvision.transforms import functional as F
import matplotlib.pyplot as plt
from PIL import Image

# Load Faster R-CNN with ResNet-50 backbone
def get_model(num_classes):
    model = torchvision.models.detection.fasterrcnn_mobilenet_v3_large_fpn(pretrained=True)
    in_features = model.roi_heads.box_predictor.cls_score.in_features
    model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)
    return model

# Initialize the model
num_classes = 3  # Background + rock + shadow
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

# Load the trained model
model = get_model(num_classes)
model.load_state_dict(torch.load(r"C:\Users\Samarth\Desktop\polar3D\rcnn\models2\fasterrcnn_resnet50_epoch_5.pth"))
model.to(device)
model.eval()

# Prepare an image
def prepare_image(image_path):
    image = Image.open(image_path).convert("RGB")
    image_tensor = F.to_tensor(image).unsqueeze(0)  # Convert to tensor and add batch dimension
    return image_tensor.to(device)

# Define COCO_CLASSES
COCO_CLASSES = {0: "background", 1: "rock", 2: "shadow"}

# Get class name
def get_class_name(class_id):
    return COCO_CLASSES.get(class_id, "Unknown")

# Draw bounding boxes and save images and labels
def draw_and_save_boxes(image, prediction, output_image_path, output_label_path, threshold=0.5):
    boxes = prediction[0]['boxes'].cpu().numpy()
    labels = prediction[0]['labels'].cpu().numpy()
    scores = prediction[0]['scores'].cpu().numpy()

    # Save image with bounding boxes
    plt.figure(figsize=(12, 10))
    plt.imshow(image)
    ax = plt.gca()

    label_data = []
    rock_count = 0

    for box, label, score in zip(boxes, labels, scores):
        if score > threshold:
            x_min, y_min, x_max, y_max = box
            class_name = get_class_name(label)
            label_data.append({"class": label, "score": score, "bbox": [x_min, y_min, x_max, y_max]})

        if score > threshold and label==1:
            x_min, y_min, x_max, y_max = box
            class_name = get_class_name(label)
            ax.add_patch(plt.Rectangle((x_min, y_min), x_max - x_min, y_max - y_min, 
                                       linewidth=2, edgecolor='r', facecolor='none'))
            ax.text(x_min, y_min, f"{class_name} ({score:.2f})", color='r', fontsize=12)
            if class_name == "rock":
                rock_count += 1

    ax.text(10, 10, f"Total Rocks: {rock_count}", color='white', fontsize=16, bbox=dict(facecolor='red', alpha=0.5))
    plt.axis('off')
    plt.savefig(output_image_path, bbox_inches='tight')
    plt.close()

    # Save label data
    with open(output_label_path, 'w') as f:
        for entry in label_data:
            f.write(f"{entry['class']} {entry['bbox']}\n")

# Process random images from a directory
def process_images(input_dir, output_dir, num_images):
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    image_paths = [os.path.join(input_dir, file) for file in os.listdir(input_dir) if file.endswith(('.png', '.jpg', '.jpeg'))]
    # selected_images = random.sample(image_paths, min(num_images, len(image_paths)))

    for image_path in image_paths:
        image_name = os.path.basename(image_path)
        output_image_path = os.path.join(output_dir, f"{os.path.splitext(image_name)[0]}_result.png")
        output_label_path = os.path.join(output_dir, f"{os.path.splitext(image_name)[0]}_labels.txt")

        image_tensor = prepare_image(image_path)

        with torch.no_grad():
            prediction = model(image_tensor)

        draw_and_save_boxes(Image.open(image_path), prediction, output_image_path, output_label_path)

# Input and output directories
input_dir = r"C:\Users\Samarth\Desktop\polar3D\dataset_resized1_copy\images\test"
output_dir = r"C:\Users\Samarth\Desktop\polar3D\results\rcnn\images"

# Process 10 random images
process_images(input_dir, output_dir, num_images=len(input_dir))
