<a href="https://colab.research.google.com/github/Andyqballer/Object-Detection-With-YOlO5-And-Faster-RCNN/blob/main/Detection.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import os
import requests
import torch
import torchvision
from torchvision.models.detection import fasterrcnn_resnet50_fpn
from pycocotools.coco import COCO
import cv2
from PIL import Image
from tqdm import tqdm
import yaml
import zipfile
import subprocess
from pathlib import Path
import numpy as np

class ObjectDetectionTrainer:
    def __init__(self, root_dir='/content/coco_subset', num_images=100):
        self.root_dir = Path(root_dir)
        self.num_images = num_images
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        self.setup_directories()
        self.install_dependencies()

    def setup_directories(self):
        self.images_dir = self.root_dir / 'images'
        self.annotations_dir = self.root_dir / 'annotations'
        self.labels_dir = self.root_dir / 'labels'
        self.yolo_dir = Path('/content/yolov5')

        for dir_path in [self.root_dir, self.images_dir, self.annotations_dir, self.labels_dir]:
            dir_path.mkdir(parents=True, exist_ok=True)

    def install_dependencies(self):
        packages = [
            'opencv-python-headless',
            'pycocotools',
            'tqdm',
            'pyyaml'
        ]
        subprocess.run(['pip', 'install'] + packages, check=True)
        subprocess.run(['apt-get', 'update'], check=True)
        subprocess.run(['apt-get', 'install', '-y', 'libgl1-mesa-glx'], check=True)

    def download_coco(self):
        ann_url = 'http://images.cocodataset.org/annotations/annotations_trainval2017.zip'
        ann_file = self.root_dir / 'annotations.zip'

        if not ann_file.exists():
            print("Downloading annotations...")
            r = requests.get(ann_url)
            r.raise_for_status()
            ann_file.write_bytes(r.content)

            with zipfile.ZipFile(ann_file, 'r') as zip_ref:
                zip_ref.extractall(self.annotations_dir)
            ann_file.unlink()

        annotation_file = self.annotations_dir / 'annotations' / 'instances_train2017.json'
        coco = COCO(str(annotation_file))
        img_ids = coco.getImgIds()[:self.num_images]

        for img_id in tqdm(img_ids, desc="Downloading images"):
            img = coco.loadImgs(img_id)[0]
            img_path = self.images_dir / img['file_name']
            if not img_path.exists():
                img_url = f"http://images.cocodataset.org/train2017/{img['file_name']}"
                r = requests.get(img_url)
                r.raise_for_status()
                img_path.write_bytes(r.content)

        print("Dataset download complete!")

    def load_dataset(self):
        annotation_file = self.annotations_dir / 'annotations' / 'instances_train2017.json'
        coco = COCO(str(annotation_file))
        img_ids = coco.getImgIds()
        images = []
        annotations = []
        unique_labels = set()

        for img_id in img_ids:
            img_info = coco.loadImgs(img_id)[0]
            img_path = self.images_dir / img_info['file_name']
            if img_path.exists():
                img_annotations = coco.getAnnIds(img_id)
                if not img_annotations:
                    continue

                images.append(str(img_path))
                anns = coco.loadAnns(img_annotations)

                boxes = []
                labels = []
                for ann in anns:
                    x, y, w, h = ann['bbox']
                    boxes.append([x, y, x + w, y + h])
                    labels.append(ann['category_id'])
                    unique_labels.add(ann['category_id'])

                annotations.append({
                    'boxes': torch.tensor(boxes, dtype=torch.float32),
                    'labels': torch.tensor(labels, dtype=torch.int64)
                })

        return images, annotations, unique_labels

    def prepare_yolo_dataset(self, images, annotations, unique_labels):
        self.labels_dir.mkdir(exist_ok=True)
        label_map = {label: idx for idx, label in enumerate(sorted(unique_labels))}

        for img_path, ann in zip(images, annotations):
            img_filename = Path(img_path).name
            label_filename = Path(img_filename).stem + '.txt'
            label_path = self.labels_dir / label_filename

            with open(label_path, 'w') as f:
                img = Image.open(img_path)
                img_width, img_height = img.size

                for box, label in zip(ann['boxes'], ann['labels']):
                    x, y, x2, y2 = box.tolist()
                    center_x = (x + (x2 - x) / 2) / img_width
                    center_y = (y + (y2 - y) / 2) / img_height
                    width = (x2 - x) / img_width
                    height = (y2 - y) / img_height

                    yolo_label = label_map[label.item()]
                    f.write(f"{yolo_label} {center_x} {center_y} {width} {height}\n")

        return label_map

    def setup_yolo_training(self, unique_labels):
        if not self.yolo_dir.exists():
            print("Cloning YOLOv5 repository...")
            subprocess.run(['git', 'clone', 'https://github.com/ultralytics/yolov5.git', str(self.yolo_dir)], check=True)

        req_file = self.yolo_dir / 'requirements.txt'
        subprocess.run(['pip', 'install', '-r', str(req_file)], check=True)

        data_yaml = {
            'train': str(self.images_dir),
            'val': str(self.images_dir),
            'nc': len(unique_labels),
            'names': [str(label) for label in sorted(unique_labels)]
        }

        data_yaml_path = self.root_dir / 'data.yaml'
        with open(data_yaml_path, 'w') as f:
            yaml.dump(data_yaml, f)

        return data_yaml_path

    def train_faster_rcnn(self, image_paths, annotations, num_epochs=1):
        model = fasterrcnn_resnet50_fpn(weights='DEFAULT')
        model.to(self.device)
        model.train()

        dataset = list(zip(image_paths, annotations))
        data_loader = torch.utils.data.DataLoader(
            dataset,
            batch_size=2,
            shuffle=True,
            collate_fn=lambda x: tuple(zip(*x))
        )

        optimizer = torch.optim.SGD(
            model.parameters(),
            lr=0.005,
            momentum=0.9,
            weight_decay=0.0005
        )

        print("Starting Faster R-CNN training...")
        for epoch in range(num_epochs):
            total_loss = 0
            for images, targets in tqdm(data_loader, desc=f"Epoch {epoch + 1}/{num_epochs}"):
                images = [torchvision.transforms.functional.to_tensor(Image.open(img).convert("RGB")).to(self.device)
                         for img in images]
                targets = [{k: v.to(self.device) for k, v in t.items()} for t in targets]

                loss_dict = model(images, targets)
                losses = sum(loss for loss in loss_dict.values())

                optimizer.zero_grad()
                losses.backward()
                optimizer.step()

                total_loss += losses.item()

            avg_loss = total_loss / len(data_loader)
            print(f"Epoch {epoch + 1}/{num_epochs}, Average Loss: {avg_loss:.4f}")

        return model

    def train_yolo(self, data_yaml_path, img_size=640, batch_size=8, epochs=1):
        original_dir = os.getcwd()
        os.chdir(str(self.yolo_dir))

        cmd = [
            'python', 'train.py',
            '--img', str(img_size),
            '--batch', str(batch_size),
            '--epochs', str(epochs),
            '--data', str(data_yaml_path),
            '--weights', 'yolov5s.pt'
        ]

        result = subprocess.run(cmd, check=True, capture_output=True, text=True)
        print("YOLOv5 Training Output:\n", result.stdout)

        os.chdir(original_dir)

    def load_trained_yolo(self):
        weights_path = self.yolo_dir / 'runs/train/exp/weights/best.pt'
        if weights_path.exists():
            return torch.hub.load('ultralytics/yolov5', 'custom', path=str(weights_path))
        return None

    def detect_objects_webcam(self, rcnn_model, yolo_model, label_map):
        """Run real-time object detection using webcam"""
        rcnn_model.eval()
        inv_label_map = {v: k for k, v in label_map.items()}

        # Initialize webcam
        print("Initializing webcam...")
        cap = cv2.VideoCapture(0)

        if not cap.isOpened():
            print("Error: Could not open webcam")
            return

        try:
            while True:
                ret, frame = cap.read()
                if not ret:
                    print("Error: Could not read frame")
                    break

                # Convert BGR to RGB for model input
                rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                pil_image = Image.fromarray(rgb_frame)

                # Faster R-CNN detection
                rcnn_image = torchvision.transforms.functional.to_tensor(rgb_frame).unsqueeze(0).to(self.device)
                with torch.no_grad():
                    rcnn_predictions = rcnn_model(rcnn_image)

                # YOLOv5 detection
                if yolo_model is not None:
                    yolo_results = yolo_model(rgb_frame)
                    yolo_boxes = yolo_results.xyxy[0].numpy()  # Get bounding boxes

                # Draw Faster R-CNN results
                for box, label, score in zip(rcnn_predictions[0]['boxes'], rcnn_predictions[0]['labels'], rcnn_predictions[0]['scores']):
                    if score >= 0.5:  # Confidence threshold
                        x1, y1, x2, y2 = map(int, box.cpu().numpy())
                        cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
                        cv2.putText(frame, f"{inv_label_map[label.item()]}: {score:.2f}", (x1, y1 - 10),
                                    cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)

                # Draw YOLO results
                for box in yolo_boxes:
                    x1, y1, x2, y2, conf, cls = box
                    if conf >= 0.5:  # Confidence threshold
                        x1, y1, x2, y2 = map(int, (x1, y1, x2, y2))
                        cv2.rectangle(frame, (x1, y1), (x2, y2), (255, 0, 0), 2)
                        cv2.putText(frame, f"{inv_label_map[int(cls)]}: {conf:.2f}", (x1, y1 - 10),
                                    cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 0), 2)

                cv2.imshow('Webcam Object Detection', frame)

                if cv2.waitKey(1) & 0xFF == ord('q'):
                    break

        finally:
            cap.release()
            cv2.destroyAllWindows()

if __name__ == '__main__':
    trainer = ObjectDetectionTrainer()
    trainer.download_coco()
    images, annotations, unique_labels = trainer.load_dataset()
    label_map = trainer.prepare_yolo_dataset(images, annotations, unique_labels)
    data_yaml_path = trainer.setup_yolo_training(unique_labels)

    # Train models
    rcnn_model = trainer.train_faster_rcnn(images, annotations, num_epochs=1)
    trainer.train_yolo(data_yaml_path, img_size=640, batch_size=8, epochs=1)

    # Load YOLO model
    yolo_model = trainer.load_trained_yolo()

    # Detect objects using webcam
    trainer.detect_objects_webcam(rcnn_model, yolo_model, label_map)


Downloading annotations...
loading annotations into memory...
Done (t=22.84s)
creating index...
index created!


Downloading images: 100%|██████████| 100/100 [00:00<00:00, 28956.19it/s]

Dataset download complete!





loading annotations into memory...
Done (t=18.45s)
creating index...
index created!
Starting Faster R-CNN training...


Epoch 1/1:   6%|▌         | 3/50 [02:30<39:25, 50.33s/it]