In [1]:
# Imports
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import cv2
import xml.etree.ElementTree as ET
import torch
import torchvision
from torchvision import models, transforms
from torch.utils.data import DataLoader, Dataset
from torchvision.models.detection import fasterrcnn_resnet50_fpn
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from PIL import Image
import os

In [2]:
class CustomDataset(Dataset):
    def __init__(self, dataset_path, split, transform=None):
        self.dataset_path = dataset_path
        self.split = split
        self.transform = transform
        self.images = []
        self.annotations = []

        images_dir = os.path.join(dataset_path, split, "images")
        annotations_dir = os.path.join(dataset_path, split, "annotations")

        for filename in os.listdir(images_dir):
            if filename.endswith(".jpg") or filename.endswith(".png"):
                image_path = os.path.join(images_dir, filename)
                self.images.append(image_path)

                annotation_path = os.path.join(annotations_dir, os.path.splitext(filename)[0] + ".xml")
                try:
                    tree = ET.parse(annotation_path)
                    root = tree.getroot()
                    annotation = []

                    for obj in root.findall("object"):
                        name = obj.find("name").text
                        bbox = obj.find("bndbox")
                        xmin = int(bbox.find("xmin").text)
                        ymin = int(bbox.find("ymin").text)
                        xmax = int(bbox.find("xmax").text)
                        ymax = int(bbox.find("ymax").text)
                        annotation.append((name, xmin, ymin, xmax, ymax))

                    self.annotations.append(annotation)
                except ET.ParseError:
                    print(f"Error parsing annotation file: {annotation_path}")

    def __len__(self):
        return len(self.images)

    def __getitem__(self, index):
        image_path = self.images[index]
        try:
            image = Image.open(image_path).convert("RGB")
            annotation = self.annotations[index]

            if self.transform:
                image = self.transform(image)

            boxes = torch.as_tensor([ann[1:] for ann in annotation], dtype=torch.float32)
            labels = torch.as_tensor([ann[0] for ann in annotation], dtype=torch.int64)

            return image, {"boxes": boxes, "labels": labels}
        except (IOError, ValueError):
            print(f"Error loading image file: {image_path}")
            return None

In [3]:
def preprocess_dataset(dataset):
    preprocessed_images = []
    preprocessed_annotations = []
    
    for image, annotation in dataset:
        if isinstance(image, torch.Tensor):
            image = transforms.ToPILImage()(image)
        else:
            image = Image.fromarray(image)
        
        image = transform(image)
        
        targets = []
        for obj in annotation:
            name, xmin, ymin, xmax, ymax = obj
            label = class_to_idx[name]
            xmin, ymin, xmax, ymax = xmin / image.shape[2], ymin / image.shape[1], xmax / image.shape[2], ymax / image.shape[1]
            targets.append([label, xmin, ymin, xmax, ymax])
        targets = torch.tensor(targets)
        
        preprocessed_images.append(image)
        preprocessed_annotations.append(targets)
    
    return preprocessed_images, preprocessed_annotations

In [4]:
# Dataset and DataLoader Creation
dataset_path = "dataset"
split = "train"

transform = transforms.Compose([
    transforms.Resize((800, 800)),  
    transforms.RandomHorizontalFlip(0.5),  
    transforms.ToTensor(),  
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  
])

dataset = CustomDataset(dataset_path, split, transform)

class_labels = set()
for _, annotation in dataset:
    for obj in annotation:
        name = obj[0]
        class_labels.add(name)

class_to_idx = {label: idx for idx, label in enumerate(class_labels)}
print("Class labels:", class_to_idx)

preprocessed_images, preprocessed_annotations = preprocess_dataset(dataset)

def collate_fn(batch):
    return tuple(zip(*batch))

train_dataset = list(zip(preprocessed_images, preprocessed_annotations))
train_loader = DataLoader(train_dataset, batch_size=4, shuffle=True, collate_fn=collate_fn)

Class labels: {'largeCar': 0, 'heavyTruck': 1, 'lightTruck': 2, 'smallCar': 3}


In [5]:
# Model Definition and Training
num_classes = len(class_to_idx) + 1
model = fasterrcnn_resnet50_fpn(weights="DEFAULT")
in_features = model.roi_heads.box_predictor.cls_score.in_features
model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

optimizer = torch.optim.SGD(model.parameters(), lr=0.005, momentum=0.9, weight_decay=0.0005)

num_epochs = 10

for epoch in range(num_epochs):
    model.train()
    epoch_loss = 0.0
    
    for images, targets in train_loader:
        images = list(image.to(device) for image in images)
        targets = [{'boxes': t[:, 1:], 'labels': t[:, 0].long()} for t in targets]
        
        loss_dict = model(images, targets)
        losses = sum(loss for loss in loss_dict.values())
        
        optimizer.zero_grad()
        losses.backward()
        optimizer.step()
        
        epoch_loss += losses.item()
    
    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {epoch_loss/len(train_loader):.4f}")

torch.save(model.state_dict(), "trained_model.pth")

Epoch [1/10], Loss: 1.4418
Epoch [2/10], Loss: 0.3149
Epoch [3/10], Loss: 0.2285
Epoch [4/10], Loss: 0.3032
Epoch [5/10], Loss: 0.2469
Epoch [6/10], Loss: 0.1760
Epoch [7/10], Loss: 0.2289
Epoch [8/10], Loss: 0.2086
Epoch [9/10], Loss: 0.1681
Epoch [10/10], Loss: 0.1353


In [7]:
# Evaluation and Testing
model.eval()

test_dataset = CustomDataset(dataset_path, split="test", transform=transform)
test_images, test_annotations = preprocess_dataset(test_dataset)
test_dataset = list(zip(test_images, test_annotations))
test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False, collate_fn=collate_fn)

for image, target in test_loader:
    image = image[0].unsqueeze(0).to(device)  # Add batch dimension and move to device
    with torch.no_grad():
        predictions = model(image)
    
    boxes = predictions[0]['boxes'].cpu().numpy()
    labels = predictions[0]['labels'].cpu().numpy()
    scores = predictions[0]['scores'].cpu().numpy()
    
    confidence_threshold = 0.5
    mask = scores >= confidence_threshold
    boxes = boxes[mask]
    labels = labels[mask]
    scores = scores[mask]
    
    image = image.squeeze(0).cpu().numpy().transpose((1, 2, 0))  # Remove batch dimension and change shape to [H, W, C]
    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
    
    for box, label, score in zip(boxes, labels, scores):
        xmin, ymin, xmax, ymax = box.astype(int)
        label_name = list(class_to_idx.keys())[list(class_to_idx.values()).index(label)]
        
        cv2.rectangle(image, (xmin, ymin), (xmax, ymax), (0, 255, 0), 2)
        cv2.putText(image, f"{label_name}: {score:.2f}", (xmin, ymin - 10),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 255, 0), 2)
    
    cv2.imshow("Object Detection", image)
    cv2.waitKey(0)
    cv2.destroyAllWindows()

In [1]:
# Video Object Detection
model.eval()

input_video_path = "bridge_1.mp4"
input_video = cv2.VideoCapture(input_video_path)

while True:
    ret, frame = input_video.read()
    if not ret:
        break

    pil_image = Image.fromarray(frame)
    image = transform(pil_image).unsqueeze(0).to(device)

    with torch.no_grad():
        predictions = model(image)

    boxes = predictions[0]['boxes'].cpu().numpy()
    labels = predictions[0]['labels'].cpu().numpy()
    scores = predictions[0]['scores'].cpu().numpy()

    confidence_threshold = 0.5
    mask = scores >= confidence_threshold
    boxes = boxes[mask]
    labels = labels[mask]
    scores = scores[mask]

    for box, label, score in zip(boxes, labels, scores):
        xmin, ymin, xmax, ymax = box.astype(int)
        label_name = list(class_to_idx.keys())[list(class_to_idx.values()).index(label)]

        cv2.rectangle(frame, (xmin, ymin), (xmax, ymax), (0, 255, 0), 2)
        cv2.putText(frame, f"{label_name}: {score:.2f}", (xmin, ymin - 10),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 255, 0), 2)

    cv2.imshow("Object Detection", frame)
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

input_video.release()
cv2.destroyAllWindows()

NameError: name 'model' is not defined

In [None]:
def slice_video_into_frames(video_path, output_dir):
    cap = cv2.VideoCapture(video_path)
    fps = cap.get(cv2.CAP_PROP_FPS)
    
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)
    
    frame_count = 0
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        
        frame_path = os.path.join(output_dir, f"frame_{frame_count:04d}.jpg")
        cv2.imwrite(frame_path, frame)
        frame_count += 1
    
    cap.release()
    return fps

input_video_path = ""

output_frames_dir = ""

fps = slice_video_into_frames(input_video_path, output_frames_dir)

print(f"Video sliced into frames at {fps} FPS. Frames saved in {output_frames_dir}.")

In [1]:
def preprocess_frame(frame_path, transform):
    frame = Image.open(frame_path).convert("RGB")
    preprocessed_frame = transform(frame)
    return preprocessed_frame

def detect_objects(model, frame_path, transform, device):
    preprocessed_frame = preprocess_frame(frame_path, transform)
    input_tensor = preprocessed_frame.unsqueeze(0).to(device)

    with torch.no_grad():
        predictions = model(input_tensor)

    return predictions

frames_dir = ""

model_path = ""

model = torch.load(model_path)
model.eval()

transform = transforms.Compose([
    transforms.Resize((800, 800)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

for frame_name in os.listdir(frames_dir):
    frame_path = os.path.join(frames_dir, frame_name)
    predictions = detect_objects(model, frame_path, transform, device)
    print(f"Processed frame: {frame_name}")

NameError: name 'torch' is not defined

In [2]:
def count_vehicles(predictions, confidence_threshold=0.5):
    vehicle_count = 0
    vehicle_boxes = []

    boxes = predictions[0]['boxes'].cpu().numpy()
    scores = predictions[0]['scores'].cpu().numpy()
    labels = predictions[0]['labels'].cpu().numpy()

    for box, score, label in zip(boxes, scores, labels):
        if score >= confidence_threshold and label == 1:  
            vehicle_count += 1
            vehicle_boxes.append(box)

    return vehicle_count, vehicle_boxes

vehicle_counts = []
vehicle_locations = []

for frame_name in os.listdir(frames_dir):
    frame_path = os.path.join(frames_dir, frame_name)
    predictions = detect_objects(model, frame_path, transform, device)

    count, boxes = count_vehicles(predictions, confidence_threshold=0.5)
    vehicle_counts.append(count)
    vehicle_locations.append(boxes)

    print(f"Processed frame: {frame_name}, Vehicle count: {count}")

total_vehicles = sum(vehicle_counts)
print(f"Total vehicles detected: {total_vehicles}")
print("Vehicle locations:")
for i, (count, boxes) in enumerate(zip(vehicle_counts, vehicle_locations)):
    print(f"Frame {i}: Count: {count}, Boxes: {boxes}")

NameError: name 'os' is not defined

In [3]:
def extract_vehicle_images(frame_path, vehicle_boxes, output_dir):
    frame = cv2.imread(frame_path)
    vehicle_images = []

    for i, box in enumerate(vehicle_boxes):
        xmin, ymin, xmax, ymax = box.astype(int)
        vehicle_image = frame[ymin:ymax, xmin:xmax]
        vehicle_image_path = os.path.join(output_dir, f"vehicle_{i}.jpg")
        cv2.imwrite(vehicle_image_path, vehicle_image)
        vehicle_images.append(vehicle_image_path)

    return vehicle_images

vehicle_images_dir = ""

if not os.path.exists(vehicle_images_dir):
    os.makedirs(vehicle_images_dir)

vehicle_image_paths = []

for frame_name, boxes in zip(os.listdir(frames_dir), vehicle_locations):
    frame_path = os.path.join(frames_dir, frame_name)
    vehicle_images = extract_vehicle_images(frame_path, boxes, vehicle_images_dir)
    vehicle_image_paths.extend(vehicle_images)

    print(f"Extracted vehicle images for frame: {frame_name}")

print("Extracted vehicle images:")
for path in vehicle_image_paths:
    print(path)

NameError: name 'os' is not defined

In [4]:
def preprocess_vehicle_image(image_path, transform):
    image = Image.open(image_path).convert("RGB")
    preprocessed_image = transform(image)
    return preprocessed_image

def classify_vehicle(model, image_path, transform, device):
    preprocessed_image = preprocess_vehicle_image(image_path, transform)
    input_tensor = preprocessed_image.unsqueeze(0).to(device)

    with torch.no_grad():
        outputs = model(input_tensor)
        _, predicted_class = torch.max(outputs, 1)

    return predicted_class.item()

resnet_model = models.resnet50(pretrained=True)
num_classes = 4  
num_features = resnet_model.fc.in_features
resnet_model.fc = torch.nn.Linear(num_features, num_classes)
resnet_model.to(device)
resnet_model.eval()

resnet_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

vehicle_classifications = []

for image_path in vehicle_image_paths:
    predicted_class = classify_vehicle(resnet_model, image_path, resnet_transform, device)
    vehicle_classifications.append(predicted_class)

    print(f"Classified vehicle: {image_path}, Class: {predicted_class}")

print("Vehicle classifications using ResNet:")
for image_path, classification in zip(vehicle_image_paths, vehicle_classifications):
    print(f"Image: {image_path}, Class: {classification}")

NameError: name 'models' is not defined

In [5]:
from torchmetrics import Precision, Recall, F1Score, Accuracy

class_labels = {
    0: 'largeCar',
    1: 'smallCar',
    2: 'heavyTruck',
    3: 'lightTruck'
}

predicted_labels = [class_labels[cls] for cls in vehicle_classifications]

ground_truth_labels = [
    'largeCar',
    'smallCar',
    'heavyTruck',
    'lightTruck',
    'smallCar',
    'largeCar',
    'heavyTruck',
    'largeCar',
    'lightTruck',
    'smallCar'
]

ground_truth_indices = [list(class_labels.values()).index(label) for label in ground_truth_labels]

predicted_labels_tensor = torch.tensor(vehicle_classifications)
ground_truth_indices_tensor = torch.tensor(ground_truth_indices)

precision = Precision(num_classes=len(class_labels), average='macro')(predicted_labels_tensor, ground_truth_indices_tensor)
recall = Recall(num_classes=len(class_labels), average='macro')(predicted_labels_tensor, ground_truth_indices_tensor)
f1_score = F1Score(num_classes=len(class_labels), average='macro')(predicted_labels_tensor, ground_truth_indices_tensor)
accuracy = Accuracy()(predicted_labels_tensor, ground_truth_indices_tensor)

print("Vehicle Classification Evaluation:")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1-score: {f1_score:.4f}")
print(f"Accuracy: {accuracy:.4f}")

ModuleNotFoundError: No module named 'torchmetrics'

In [6]:
def draw_bounding_boxes(image, boxes, labels, scores, class_labels, confidence_threshold=0.5):
    for box, label, score in zip(boxes, labels, scores):
        if score >= confidence_threshold:
            xmin, ymin, xmax, ymax = box.astype(int)
            class_name = class_labels[label]
            
            cv2.rectangle(image, (xmin, ymin), (xmax, ymax), (0, 255, 0), 2)
            cv2.putText(image, f"{class_name}: {score:.2f}", (xmin, ymin - 10),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 255, 0), 2)
    
    return image

processed_frames_dir = ""

labeled_frames_dir = ""
os.makedirs(labeled_frames_dir, exist_ok=True)

for frame_name, boxes, labels, scores in zip(os.listdir(frames_dir), vehicle_locations, vehicle_classifications, vehicle_scores):
    frame_path = os.path.join(frames_dir, frame_name)
    frame = cv2.imread(frame_path)
    
    labeled_frame = draw_bounding_boxes(frame, boxes, labels, scores, class_labels)
    
    labeled_frame_path = os.path.join(labeled_frames_dir, frame_name)
    cv2.imwrite(labeled_frame_path, labeled_frame)
    
    print(f"Labeled frame: {frame_name}")

print("Labeling completed.")

NameError: name 'os' is not defined

In [None]:
def rebuild_video_from_frames(frames_dir, output_path, fps):
    frame_files = sorted(os.listdir(frames_dir), key=lambda x: int(x.split('_')[1].split('.')[0]))
    
    if len(frame_files) == 0:
        print("No frames found in the directory.")
        return
    
    first_frame_path = os.path.join(frames_dir, frame_files[0])
    first_frame = cv2.imread(first_frame_path)
    height, width, _ = first_frame.shape
    
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    video_writer = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
    
    for frame_file in frame_files:
        frame_path = os.path.join(frames_dir, frame_file)
        frame = cv2.imread(frame_path)
        video_writer.write(frame)
    
    video_writer.release()
    
    print("Video reconstruction completed.")

labeled_frames_dir = ""

output_video_path = ""

output_fps = 30

rebuild_video_from_frames(labeled_frames_dir, output_video_path, output_fps)