In [None]:
import zipfile
z = zipfile.ZipFile('/content/SSD_Data.zip')
extract_path = "/content/dataset"
z.extractall(extract_path)

In [None]:
import torch
import torchvision
import cv2
import os
from torch.utils.data import DataLoader
from torchvision.transforms import transforms
from torchvision.models.detection.ssd import SSDClassificationHead
from torchvision.models.detection import _utils
from torchvision.models.detection import SSD300_VGG16_Weights

In [None]:
annotation_dirs = ["/content/dataset/train/labels"]
images_dirs = ["/content/dataset/train/images"]

print(f'Before : {len(os.listdir(annotation_dirs[0]))}')

for annotation_dir, images_dir in zip(annotation_dirs, images_dirs):
    annotations = os.listdir(annotation_dir)
    images = set(os.listdir(images_dir))  # Use a set for faster lookup

    for annotation in annotations:
        annotation_file = os.path.join(annotation_dir, annotation)
        image_file_name = annotation[:-4] + '.jpg'

        # Delete annotation file if the corresponding image is missing
        if image_file_name not in images:
            os.remove(annotation_file)
            continue  # No need to check further

        with open (annotation_file,"r") as f:
            for line in f:
                data = line.strip().split()
                if len(data) < 5:
                    f.close()
                    os.remove(annotation_file)
                break

print(f'After : {len(os.listdir(annotation_dirs[0]))}')

In [None]:
class CustomDataset():
    def __init__(self, image_dir, annotations, annotations_dir, transform=None):
        self.image_dir = image_dir
        self.annotations = annotations
        self.annotations_dir = annotations_dir
        self.transform = transform

    def __len__(self):
        return len(self.annotations)

    def __getitem__(self, idx):
        # Get annotation file name
        annotation = self.annotations[idx]
        # Get image path
        img_path = os.path.join(self.image_dir, annotation[:-4] + '.jpg')

        # Load image
        image = cv2.imread(img_path)
        if image is None:
            raise FileNotFoundError(f"Image {img_path} not found.")

        # Convert image channels
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

        # Get image dimensions
        height, width, _ = image.shape

        # Parse the annotation file
        annotation_file = os.path.join(self.annotations_dir, annotation)
        boxes = []
        labels = []

        # Open annotation file
        with open(annotation_file, "r") as f:
            for line in f:
                # Split data
                data = line.strip().split()
                class_index = int(data[0])  # Class index
                x_center = float(data[1]) * width
                y_center = float(data[2]) * height
                box_width = float(data[3]) * width
                box_height = float(data[4]) * height

                # Convert to absolute coordinates [xmin, ymin, xmax, ymax]
                xmin = x_center - box_width / 2
                ymin = y_center - box_height / 2
                xmax = x_center + box_width / 2
                ymax = y_center + box_height / 2

                boxes.append([xmin, ymin, xmax, ymax])
                labels.append(int(class_index + 1))

        # Convert to PyTorch tensors
        boxes = torch.tensor(boxes, dtype=torch.float32)
        labels = torch.tensor(labels, dtype=torch.int64)

        # Apply transformations
        if self.transform:
            image = self.transform(image)

        # Create target dictionary
        target = {"boxes": boxes, "labels": labels}

        return image, target

In [None]:
# Function to create the SSD model
def create_model(num_classes=6, size=300):
    # Load a pre-trained SSD model (SSD300 with VGG16 backbone)
    model = torchvision.models.detection.ssd300_vgg16(weights=SSD300_VGG16_Weights.COCO_V1)
    # Explanation: The `ssd300_vgg16` model is a version of SSD with a VGG16 backbone, pre-trained on the COCO dataset.

    # print(model)

    # Retrieve the number of output channels from the backbone given the input size
    in_channels = _utils.retrieve_out_channels(model.backbone, (size, size))
    # Explanation: This gets the number of channels coming out of the backbone by passing the desired input size. This is important for modifying later layers.

    # print("inchannels ==> \n\n",in_channels)

    # Get the number of anchors per location from the model's anchor generator
    num_anchors = model.anchor_generator.num_anchors_per_location()
    # Explanation: SSD uses anchor boxes at different aspect ratios and scales. This line retrieves the number of anchors generated by the model.

    # print("anchors ==> \n\n",num_anchors)

    # Replace the classification head to accommodate the number of classes
    model.head.classification_head = SSDClassificationHead(
        in_channels=in_channels,
        num_anchors=num_anchors,
        num_classes=num_classes
    )
    # Explanation: The `classification_head` is replaced with a new one that matches the number of classes defined by `num_classes`. It uses the number of channels from the backbone and the number of anchors.

    # Adjust the transform sizes to match the input size
    model.transform.min_size = (size,)
    model.transform.max_size = size
    # Explanation: These lines modify the model's transform settings to adjust the input image sizes. `min_size` and `max_size` determine the resizing behavior.

    # Return the modified model
    return model

# Main block to initialize and print the model
model = create_model(num_classes=6, size=640)
# Explanation: This line creates a model with 2 classes (e.g., pothole and background) and an input image size of 640x640 pixels.

# print(model)
# Explanation: This prints the architecture of the created model.

In [None]:
print(model)

In [None]:
# Step 4: Dataset and DataLoader

## TRAIN DATALOADER
train_image_dir = "/content/dataset/train/images"
train_annotation_dir = "/content/dataset/train/labels"
train_annotations = os.listdir(train_annotation_dir)

train_dataset = CustomDataset(train_image_dir, train_annotations, train_annotation_dir, transform=transforms.ToTensor())
train_data_loader = DataLoader(train_dataset, batch_size=4, shuffle=True, collate_fn=lambda x: tuple(zip(*x)))

In [None]:
# Örnek test data loader (seninki farklı olabilir)
test_image_dir = "/content/dataset/test/images"
test_annotation_dir = "/content/dataset/test/labels"
test_annotations = os.listdir(test_annotation_dir)

test_dataset = CustomDataset(test_image_dir, test_annotations, test_annotation_dir, transform=transforms.ToTensor())
test_data_loader = DataLoader(test_dataset, batch_size=4, shuffle=False, collate_fn=lambda x: tuple(zip(*x)))

In [None]:
# Validation set yolunu belirt
val_image_dir = "/content/dataset/valid/images"
val_annotation_dir = "/content/dataset/valid/labels"
val_annotations = os.listdir(val_annotation_dir)

# Dataset ve DataLoader
val_dataset = CustomDataset(val_image_dir, val_annotations, val_annotation_dir, transform=transforms.ToTensor())
val_data_loader = DataLoader(val_dataset, batch_size=4, shuffle=False, collate_fn=lambda x: tuple(zip(*x)))

In [None]:
print(len(train_data_loader))
print(len(test_data_loader))

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(params, lr=0.001, momentum=0.9)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.1)
num_epochs = 10

In [None]:
import matplotlib.pyplot as plt
import matplotlib.patches as patches

# Create the plot
fig, ax = plt.subplots(1, 2, figsize=(10, 6))
ax = ax.ravel()

# Get a batch of images and targets from the data loader
data_loader_iter = iter(train_data_loader)
images, targets = next(data_loader_iter)


class_names = ['bg', 'ambulance', 'bus', 'car', 'motorcycle', 'truck']

# Plot the images with bounding boxes
for idx in range(2):
    image = images[idx].permute(1, 2, 0).numpy()  # Convert from (C, H, W) to (H, W, C) for visualization
    target = targets[idx]
    boxes = target["boxes"].numpy()
    labels = target["labels"].numpy()

    ax[idx].imshow(image)

    # Add bounding boxes to the image
    for box, label in zip(boxes, labels):
        xmin, ymin, xmax, ymax = box
        width, height = xmax - xmin, ymax - ymin
        rect = patches.Rectangle(
            (xmin, ymin),
            width,
            height,
            linewidth=2,
            edgecolor="red",
            facecolor="none",
        )
        ax[idx].add_patch(rect)
        ax[idx].text(
            xmin,
            ymin - 10,
            f"{class_names[label]}",
            color="red",
            fontsize=12,
            bbox=dict(facecolor="yellow", alpha=0.5, edgecolor="red"),
        )

# Display the image with bounding boxes
plt.show()

In [None]:
from collections import defaultdict
import torchvision.ops as ops

def evaluate_model(model, data_loader, device, iou_threshold=0.5, score_threshold=0.5):
    model.eval()
    total_tp = defaultdict(int)
    total_fp = defaultdict(int)
    total_fn = defaultdict(int)
    ap_per_class = defaultdict(list)

    with torch.no_grad():
        for images, targets in data_loader:
            images = [img.to(device) for img in images]
            outputs = model(images)

            for output, target in zip(outputs, targets):
                if len(target["boxes"]) == 0:
                    continue

                pred_boxes = output['boxes']
                pred_labels = output['labels']
                pred_scores = output['scores']

                gt_boxes = target["boxes"].to(device)
                gt_labels = target["labels"].to(device)

                keep = pred_scores > score_threshold
                pred_boxes = pred_boxes[keep]
                pred_labels = pred_labels[keep]

                matched_gt = set()
                for i in range(len(pred_boxes)):
                    box = pred_boxes[i].unsqueeze(0)
                    label = pred_labels[i].item()
                    ious = ops.box_iou(box, gt_boxes)
                    iou_max, idx = ious.max(1)
                    idx = idx.item()

                    if iou_max.item() >= iou_threshold and gt_labels[idx].item() == label and idx not in matched_gt:
                        total_tp[label] += 1
                        matched_gt.add(idx)
                        ap_per_class[label].append(iou_max.item())
                    else:
                        total_fp[label] += 1

                for i, lbl in enumerate(gt_labels):
                    if i not in matched_gt:
                        total_fn[lbl.item()] += 1

    total_p, total_r, total_ap50, total_ap5095, count = 0, 0, 0, 0, 0

    for label in total_tp:
        tp = total_tp[label]
        fp = total_fp[label]
        fn = total_fn[label]
        p = tp / (tp + fp) if (tp + fp) > 0 else 0
        r = tp / (tp + fn) if (tp + fn) > 0 else 0
        aps = ap_per_class[label]
        ap50 = sum([1 if ap > 0.5 else 0 for ap in aps]) / len(aps) if aps else 0
        ap5095 = sum(aps) / len(aps) if aps else 0
        total_p += p
        total_r += r
        total_ap50 += ap50
        total_ap5095 += ap5095
        count += 1

    avg_p = total_p / count
    avg_r = total_r / count
    avg_ap50 = total_ap50 / count
    avg_ap5095 = total_ap5095 / count

    val_precisions.append(avg_p)
    val_recalls.append(avg_r)
    val_map50.append(avg_ap50)
    val_map5095.append(avg_ap5095)

    print(f"Precision: {avg_p:.3f}, Recall: {avg_r:.3f}, mAP50: {avg_ap50:.3f}, mAP50-95: {avg_ap5095:.3f}")


In [None]:
train_loss_list = []
val_precisions = []
val_recalls = []
val_map50 = []
val_map5095 = []
epochs = []

for epoch in range(num_epochs):
    model.train()
    train_running_loss = 0

    for images, targets in train_data_loader:
        images = list(img.to(device) for img in images)
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

        loss_dict = model(images, targets)
        losses = sum(loss for loss in loss_dict.values())

        optimizer.zero_grad()
        losses.backward()
        optimizer.step()

        train_running_loss += losses.item()

    scheduler.step()
    epoch_loss = train_running_loss / len(train_dataset)
    train_loss_list.append(epoch_loss)
    epochs.append(epoch + 1)
    print(f"Epoch {epoch+1} - Train Loss: {epoch_loss:.4f}")

    # Değerlendirme
    evaluate_model(model, val_data_loader, device)
    torch.save(model.state_dict(), f"ssd_epoch_{epoch+1}.pth")


In [None]:
import matplotlib.pyplot as plt
import pandas as pd

metrics = [
    train_loss_list, val_precisions, val_recalls,
    val_map50, val_map5095
]

titles = [
    "train/loss", "val/precision", "val/recall",
    "metrics/mAP50", "metrics/mAP50-95"
]

plt.figure(figsize=(15, 8))
for i, data in enumerate(metrics):
    plt.subplot(2, 3, i + 1)
    plt.plot(data, label='value', marker='o')
    if len(data) >= 5:
        plt.plot(pd.Series(data).rolling(3).mean(), linestyle='dotted', label='smooth')
    plt.title(titles[i])
    plt.legend()
    plt.grid()
plt.tight_layout()
plt.show()


In [None]:
import matplotlib.pyplot as plt

plt.figure(figsize=(10, 5))
plt.plot(epochs, train_loss_list, marker='o', color='b')
plt.title("Training Loss per Epoch")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.grid(True)
plt.tight_layout()
plt.show()


In [None]:
torch.save(model.state_dict(), "SSD_custom1.pth")

In [None]:
import matplotlib.pyplot as plt
import matplotlib.patches as patches
import numpy as np
from PIL import Image
import random
import os
import torch
from torchvision.transforms import transforms


# Step 1: Recreate the model architecture
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = create_model(num_classes=6, size=640)


model.load_state_dict(torch.load("/content/ssd_epoch_10.pth"))
# Assuming `model` and `test_data_loader` are already defined
model.eval()  # Set the model to evaluation mode
model.to(device)  # Ensure the model is on the correct device

# Create the plot
fig, ax = plt.subplots(4, 4, figsize=(20,20))
ax = ax.ravel()

transform = transforms.ToTensor()

test_dir = "/content/dataset/test/images"
test_list_file = os.listdir(test_dir)
# font = ImageFont.truetype("arial.ttf", 28)


class_names = ['bg', 'ambulance', 'bus', 'car', 'motorcycle', 'truck']

for idx in range(16):

    # Prepare the image
    img_path = os.path.join(test_dir,random.choice(test_list_file))
    image = Image.open(img_path)
    image = transform(image)
    image_tensor = image.unsqueeze(0).to(device)  # Add a batch dimension

    # Get predictions
    with torch.no_grad():
        prediction = model(image_tensor)[0]  # Get the first image's predictions

    # print(prediction)

    # Convert image to numpy for visualization
    image_np = image.cpu().permute(1, 2, 0).numpy()  # Convert (C, H, W) -> (H, W, C)

    # Extract predictions
    boxes = prediction["boxes"].cpu().numpy()
    labels = prediction["labels"].cpu().numpy()
    scores = prediction["scores"].cpu().numpy()

    ax[idx].imshow(image_np)
    ax[idx].axis("off")

    img = Image.open(img_path)

    colors = [
        (1, 0, 0),       # Red
        (0, 1, 0),       # Green
        (0, 0, 1),       # Blue
        (1, 0, 1),       # Magenta
        (1, 1, 0),       # Yellow
        (0, 1, 1),       # Cyan
        (1, 0.55, 0),    # Orange
        (0, 0.5, 0)      # Dark Green
    ]

    # Add bounding boxes to the image
    for box, label, score in zip(boxes, labels, scores):
        color = random.choice(colors)
        if score >= 0.5:
            xmin, ymin, xmax, ymax = box
            width, height = xmax - xmin, ymax - ymin
            rect = patches.Rectangle(
                (xmin, ymin),
                width,
                height,
                linewidth=2,
                edgecolor=color,
                facecolor="none",
            )
            ax[idx].add_patch(rect)
            ax[idx].text(
                xmin,
                ymin - 8,
                f"{class_names[label]}",
                color='w',
                fontsize=12,
                bbox=dict(facecolor=color, alpha=0.5, edgecolor=color),
            )

# Display the image with bounding boxes
plt.show()

In [None]:
import os
import torch
from torchvision import transforms
from PIL import Image
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
from sklearn.metrics import confusion_matrix

# Sınıf isimleri (bg hariç!)
class_names = ['ambulance', 'bus', 'car', 'motorcycle', 'truck']
num_classes = len(class_names)

# Cihaz seçimi
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# SSD modelini oluştur ve eğitilmiş ağırlıkları yükle
from torchvision.models.detection.ssd import ssd300_vgg16
model = ssd300_vgg16(weights=None, num_classes=num_classes + 1)  # +1 çünkü bg dahil
model.load_state_dict(torch.load('/content/ssd_epoch_10.pth', map_location=device))
model.to(device)
model.eval()

# Görsel ön işleme
transform = transforms.Compose([
    transforms.Resize((300, 300)),
    transforms.ToTensor(),
])

# Klasör yolları
image_folder = '/content/dataset/valid/images'
label_folder = '/content/dataset/valid/labels'

# Tahmin ve gerçek etiket listeleri
y_true = []
y_pred = []

# Label okuma fonksiyonu (YOLO formatı)
def load_labels(label_path):
    with open(label_path, 'r') as f:
        lines = f.readlines()
        class_indices = [int(line.strip().split()[0]) for line in lines]
    return class_indices

# Görseller üzerinde tahmin ve karşılaştırma
for image_name in os.listdir(image_folder):
    if not image_name.endswith(('.jpg', '.png', '.jpeg')):
        continue

    image_path = os.path.join(image_folder, image_name)
    label_path = os.path.join(label_folder, image_name.rsplit('.', 1)[0] + '.txt')

    image = Image.open(image_path).convert('RGB')
    input_tensor = transform(image).unsqueeze(0).to(device)

    with torch.no_grad():
        outputs = model(input_tensor)[0]

    scores = outputs['scores']
    labels = outputs['labels']  # Bunlar modelin output sınıf id'leri

    # Gerçek sınıfı oku
    true_classes = load_labels(label_path)

    # Tahmin edilen sınıf (confidence > 0.5 olan en yüksek skorlu sınıf)
    if len(scores) > 0 and scores[0] > 0.5:
        pred_class = labels[0].item()
        if pred_class != 0:  # bg değilse
            y_pred.append(pred_class - 1)  # bg'yi çıkar (0-based)
            if len(true_classes) > 0:
                y_true.append(true_classes[0])
    elif len(true_classes) > 0:
        # Model hiçbir şey tahmin etmediyse ama aslında etiket varsa, boş tahmin say
        y_pred.append(-1)  # yanlış negatif
        y_true.append(true_classes[0])

# -1 tahminleri filtrele
filtered_true = []
filtered_pred = []
for t, p in zip(y_true, y_pred):
    if p != -1:
        filtered_true.append(t)
        filtered_pred.append(p)

# Confusion Matrix
cm = confusion_matrix(filtered_true, filtered_pred, labels=list(range(num_classes)))

# Heatmap görselleştirme
plt.figure(figsize=(10, 7))
sns.heatmap(cm, annot=True, fmt='d', xticklabels=class_names, yticklabels=class_names, cmap='Blues')
plt.xlabel('Predicted')
plt.ylabel('True')
plt.title('Confusion Matrix Heatmap for SSD Model (bg hariç)')
plt.show()
