<a href="https://colab.research.google.com/github/VIVEKPATIL12/DS_Combined/blob/main/speedBreaker.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!unzip /content/SpeedBMainCode.zip

In [1]:
import os
import cv2
import numpy as np
from sklearn.model_selection import train_test_split
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms
import torchvision.models.detection as detection




In [None]:
# Get a list of all image files in the directory
image_dir = "/content/train/images"
image_files = [os.path.join(image_dir, f) for f in os.listdir(image_dir) if f.endswith(".jpg") or f.endswith(".png")]

# Get a list of corresponding label files
label_dir = "/content/train/labels"
label_files = [os.path.join(label_dir, f) for f in os.listdir(label_dir) if f.endswith(".xml") or f.endswith(".txt")]

# Check if the number of image files matches the number of label files
assert len(image_files) == len(label_files), "Number of image files and label files do not match"

# Split the data into training and validation sets
train_images, val_images, train_labels, val_labels = train_test_split(image_files, label_files, test_size=0.2, random_state=42)

In [None]:
class SpeedBreakerDataset(Dataset):
    def __init__(self, images, labels, transform=None, max_labels=100):
        self.images = images
        self.labels = labels
        self.transform = transform
        self.max_labels = max_labels

    def __len__(self):
        return len(self.images)

    def __getitem__(self, index):
        image = cv2.imread(self.images[index])
        label_file = self.labels[index]

        # Read the label from the file
        with open(label_file, 'r') as f:
            lines = f.readlines()

        labels = []
        boxes = []
        for line in lines:
            values = line.strip().split()
            class_label = int(values[0])
            x, y, w, h = float(values[1]), float(values[2]), float(values[3]), float(values[4])
            labels.append(class_label)
            # convert (x, y, w, h) to (x1, y1, x2, y2)
            x1 = x - w/2
            y1 = y - h/2
            x2 = x + w/2
            y2 = y + h/2
            boxes.append([x1, y1, x2, y2])

        target = {
            "boxes": torch.tensor(boxes, dtype=torch.float32),
            "labels": torch.tensor(labels, dtype=torch.long),
            "image_id": torch.tensor([index]),
            "area": torch.tensor([abs((box[2]-box[0])*(box[3]-box[1])) for box in boxes], dtype=torch.float32),
            "iscrowd": torch.tensor([0]*len(boxes), dtype=torch.int64)
        }

        if self.transform:
            image = self.transform(image)

        return image, target

transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225))])

train_dataset = SpeedBreakerDataset(train_images, train_labels, transform=transform)
val_dataset = SpeedBreakerDataset(val_images, val_labels, transform=transform)

train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True, collate_fn=lambda x: tuple(map(list, zip(*x))))
val_loader = DataLoader(val_dataset, batch_size=16, shuffle=False, collate_fn=lambda x: tuple(map(list, zip(*x))))


# Define YOLO model and optimizer
model = detection.fasterrcnn_resnet50_fpn(pretrained=True)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)

# Train YOLO model
for epoch in range(10):
    model.train()
    for images, targets in train_loader:
        images = torch.stack(images).to(device)
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

        optimizer.zero_grad()

        outputs = model(images, targets)
        loss = sum([v for v in outputs.values()])

        loss.backward()
        optimizer.step()

    print(f"Epoch {epoch+1}, Loss: {loss.item()}")

# Evaluate YOLO model on validation set
model.eval()
with torch.no_grad():
    total_correct = 0
    for images, targets in val_loader:
        images = torch.stack(images).to(device)
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

        outputs = model(images, targets)
        loss = sum([v for v in outputs.values()])

        total_correct += 1

    accuracy = total_correct / len(val_loader)
    print(f"Validation Accuracy: {accuracy:.2f}%")

# Use trained YOLO model to detect speed breakers
def detect_speed_breaker(image_path):
    image = cv2.imread(image_path)
    (H, W) = image.shape[:2]

    model.eval()
    with torch.no_grad():
        image = transforms.ToTensor()(image)
        image = image.unsqueeze(0).to(device)
        outputs = model(image)

        scores = outputs[0]['scores'].detach().cpu().numpy()
        classIDs = outputs[0]['labels'].detach().cpu().numpy()
        boxes = outputs[0]['boxes'].detach().cpu().numpy()

        for box, score, classID in zip(boxes, scores, classIDs):
            if score > 0.5:
                (x1, y1, x2, y2) = box.astype("int")
                cv2.rectangle(image, (x1, y1), (x2, y2), (0, 255, 0), 2)

        cv2.imshow("Image", image)
        cv2.waitKey(0)

detect_speed_breaker("/content/train/images/speedbreakerrefinedone (372).jpg")

torch.save(model.state_dict(), 'yolo_speed_breakerDetector.pth')
