In [1]:

# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os


In [None]:
!pip install -i https://test.pypi.org/simple/ supervision==0.3.0
!pip install -q transformers
!pip install -q pytorch-lightning
!pip install -q timm
!pip install -q roboflow
!pip install -q pycocotools
!pip install wurlitzer
!pip install jupyter-lsp==2.0.0
!pip install packaging>=22
!pip install shapely>=2.0.1
!pip install keras==2.15.0
!pip install numpy<1.26


In [None]:
import os
import torch
from transformers import DetrForObjectDetection, DetrImageProcessor
import supervision as sv
import pytorch_lightning as pl

# settings
DEVICE = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
CHECKPOINT = 'facebook/detr-resnet-50'
CONFIDENCE_THRESHOLD = 0.5
IOU_THRESHOLD = 0.8

image_processor = DetrImageProcessor.from_pretrained(CHECKPOINT)
model = DetrForObjectDetection.from_pretrained(CHECKPOINT)
model.to(DEVICE)


In [None]:
import glob

def get_id2label(labels_folder):
    class_ids = set()
    label_paths = glob.glob(os.path.join(labels_folder, "*.txt"))
    for label_path in label_paths:
        with open(label_path, 'r') as f:
            for line in f:
                class_id = int(line.strip().split()[0])
                class_ids.add(class_id)
    id2label = {class_id: "text_region" for class_id in class_ids}
    return id2label

# Define id2label for your dataset
id2label = get_id2label("/kaggle/input/yolovmawa/train-output-labels")


In [None]:
import os
import torch
import glob
import cv2
from torch.utils.data import Dataset
import torchvision.transforms as T

In [None]:
class YOLOv8Dataset(Dataset):
    def __init__(self, images_folder, labels_folder, image_processor, id2label):
        self.images_folder = images_folder
        self.labels_folder = labels_folder
        self.image_processor = image_processor
        self.id2label = id2label
        self.image_paths = glob.glob(os.path.join(images_folder, "*.jpg"))
        self.label_paths = glob.glob(os.path.join(labels_folder, "*.txt"))
        self.transforms = T.Compose([
            T.ToTensor()
        ])

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        image_path = self.image_paths[idx]
        label_path = os.path.join(self.labels_folder, os.path.basename(image_path).replace(".jpg", ".txt"))

        # Load image
        image = cv2.imread(image_path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        image = self.transforms(image)
        image = image / 255.0  # Ensure image values are within [0, 1]

        # Load annotations
        boxes = []
        labels = []
        areas = []
        with open(label_path, 'r') as f:
            for line in f:
                class_id, x_center, y_center, width, height = map(float, line.strip().split())
                labels.append(int(class_id))
                x_center *= image.shape[2]
                y_center *= image.shape[1]
                width *= image.shape[2]
                height *= image.shape[1]
                x_min = x_center - width / 2
                y_min = y_center - height / 2
                boxes.append([x_min, y_min, width, height])
                areas.append(width * height)  # Calculate area

        # Prepare annotations in COCO format
        annotations = {
            "image_id": idx,
            "annotations": [
                {"bbox": box, "category_id": label, "area": area}
                for box, label, area in zip(boxes, labels, areas)
            ]
        }

        encoding = self.image_processor(images=image, annotations=annotations, return_tensors="pt", do_rescale=False, size={"shortest_edge": 800, "longest_edge": 800})
        pixel_values = encoding["pixel_values"].squeeze()
        target = encoding["labels"][0]

        return pixel_values, target

In [None]:
from torch.utils.data import DataLoader

def collate_fn(batch):
    pixel_values = [item[0] for item in batch]
    encoding = image_processor.pad(pixel_values, return_tensors="pt")
    labels = [item[1] for item in batch]
    return {
        'pixel_values': encoding['pixel_values'],
        'pixel_mask': encoding['pixel_mask'],
        'labels': labels
    }

train_dataset = YOLOv8Dataset(images_folder="/kaggle/input/yolovmawa/train1", labels_folder="/kaggle/input/yolovmawa/train-output-labels", image_processor=image_processor, id2label=id2label)
val_dataset = YOLOv8Dataset(images_folder="/kaggle/input/yolovmawa/valid1", labels_folder="/kaggle/input/yolovmawa/output-labels", image_processor=image_processor, id2label=id2label)
test_dataset = YOLOv8Dataset(images_folder="/kaggle/input/yolovmawa/test1", labels_folder="/kaggle/input/yolovmawa/testpost/output", image_processor=image_processor, id2label=id2label)

train_dataloader = DataLoader(dataset=train_dataset, collate_fn=collate_fn, batch_size=4, shuffle=True, num_workers=3)
val_dataloader = DataLoader(dataset=val_dataset, collate_fn=collate_fn, batch_size=4, num_workers=3)
test_dataloader = DataLoader(dataset=test_dataset, collate_fn=collate_fn, batch_size=4, num_workers=3)


In [None]:
class Detr(pl.LightningModule):
    def __init__(self, lr, lr_backbone, weight_decay):
        super().__init__()
        self.model = DetrForObjectDetection.from_pretrained(
            pretrained_model_name_or_path=CHECKPOINT, 
            num_labels=len(id2label),
            ignore_mismatched_sizes=True
        )
        self.lr = lr
        self.lr_backbone = lr_backbone
        self.weight_decay = weight_decay

    def forward(self, pixel_values, pixel_mask):
        return self.model(pixel_values=pixel_values, pixel_mask=pixel_mask)

    def common_step(self, batch, batch_idx):
        pixel_values = batch["pixel_values"]
        pixel_mask = batch["pixel_mask"]
        labels = [{k: v.to(self.device) for k, v in t.items()} for t in batch["labels"]]

        outputs = self.model(pixel_values=pixel_values, pixel_mask=pixel_mask, labels=labels)
        loss = outputs.loss
        loss_dict = outputs.loss_dict
        return loss, loss_dict

    def training_step(self, batch, batch_idx):
        loss, loss_dict = self.common_step(batch, batch_idx)     
        self.log("training_loss", loss)
        for k,v in loss_dict.items():
            self.log("train_" + k, v.item())
        return loss

    def validation_step(self, batch, batch_idx):
        loss, loss_dict = self.common_step(batch, batch_idx)     
        self.log("validation/loss", loss)
        for k, v in loss_dict.items():
            self.log("validation_" + k, v.item())
        return loss

    def configure_optimizers(self):
        param_dicts = [
            {"params": [p for n, p in self.named_parameters() if "backbone" not in n and p.requires_grad]},
            {"params": [p for n, p in self.named_parameters() if "backbone" in n and p.requires_grad], "lr": self.lr_backbone},
        ]
        return torch.optim.AdamW(param_dicts, lr=self.lr, weight_decay=self.weight_decay)

model = Detr(lr=1e-4, lr_backbone=1e-5, weight_decay=1e-4)

from pytorch_lightning import Trainer

trainer = Trainer(devices=1, accelerator="gpu", max_epochs=60, gradient_clip_val=0.1, accumulate_grad_batches=8, log_every_n_steps=5)
trainer.fit(model, train_dataloader, val_dataloader)



In [None]:
import os

# Check if the test images directory exists and list the files
test_image_dir = "/kaggle/input/yolovmawa/test1"
if not os.path.exists(test_image_dir):
    print(f"Directory {test_image_dir} does not exist.")
else:
    test_image_paths = glob.glob(os.path.join(test_image_dir, "*.jpg"))
    if not test_image_paths:
        print(f"No images found in {test_image_dir} directory.")
    else:
        print(f"Found {len(test_image_paths)} images in the test directory.")


In [None]:
from sklearn.metrics import precision_score, recall_score, f1_score

def calculate_metrics(predictions, ground_truths, iou_threshold=0.5):
    tp = 0
    fp = 0
    fn = 0

    for prediction, ground_truth in zip(predictions, ground_truths):
        pred_boxes = prediction['boxes']
        pred_labels = prediction['labels']
        pred_scores = prediction['scores']

        gt_boxes = ground_truth['boxes']
        gt_labels = ground_truth['labels']

        # Match predictions with ground truth boxes
        for pred_box, pred_label in zip(pred_boxes, pred_labels):
            max_iou = 0
            matched_gt = None

            for gt_box, gt_label in zip(gt_boxes, gt_labels):
                if pred_label == gt_label:
                    iou = compute_iou(pred_box, gt_box)
                    if iou > max_iou:
                        max_iou = iou
                        matched_gt = gt_box

            if max_iou >= iou_threshold:
                tp += 1
                gt_boxes.remove(matched_gt)
            else:
                fp += 1

        fn += len(gt_boxes)

    precision = tp / (tp + fp) if tp + fp > 0 else 0
    recall = tp / (tp + fn) if tp + fn > 0 else 0
    f1 = 2 * (precision * recall) / (precision + recall) if precision + recall > 0 else 0

    return precision, recall, f1

def compute_iou(box1, box2):
    x1, y1, w1, h1 = box1
    x2, y2, w2, h2 = box2

    xi1 = max(x1, x2)
    yi1 = max(y1, y2)
    xi2 = min(x1 + w1, x2 + w2)
    yi2 = min(y1 + h1, y2 + h2)

    inter_area = max(0, xi2 - xi1) * max(0, yi2 - yi1)
    box1_area = w1 * h1
    box2_area = w2 * h2

    union_area = box1_area + box2_area - inter_area
    iou = inter_area / union_area if union_area != 0 else 0

    return iou


In [None]:
import torch
from tqdm.notebook import tqdm
from sklearn.metrics import precision_recall_fscore_support

DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
CONFIDENCE_THRESHOLD = 0.5

def calculate_metrics(y_true, y_pred):
    precision, recall, f1, _ = precision_recall_fscore_support(y_true, y_pred, average='weighted', zero_division=0)
    return precision, recall, f1

def evaluate_custom(test_dataloader, model, image_processor):
    all_precisions = []
    all_recalls = []
    all_f1s = []

    # Move model to the device (GPU or CPU)
    model.to(DEVICE)
    
    for idx, batch in enumerate(tqdm(test_dataloader)):
        pixel_values = batch["pixel_values"].to(DEVICE)
        pixel_mask = batch["pixel_mask"].to(DEVICE)
        labels = [{k: v.to(DEVICE) for k, v in t.items()} for t in batch["labels"]]

        with torch.no_grad():
            outputs = model(pixel_values=pixel_values, pixel_mask=pixel_mask)

        orig_target_sizes = torch.stack([target["orig_size"] for target in labels], dim=0)
        results = image_processor.post_process_object_detection(outputs, target_sizes=orig_target_sizes)

        for target, result in zip(labels, results):
            prediction = {
                "boxes": result["boxes"].cpu().numpy(),
                "labels": result["labels"].cpu().numpy(),
                "scores": result["scores"].cpu().numpy()
            }
            ground_truth = {
                "boxes": target["boxes"].cpu().numpy(),
                "labels": target["class_labels"].cpu().numpy()  # Updated key for labels
            }

            if len(prediction["labels"]) > 0:  # Ensure there are predictions
                precision, recall, f1 = calculate_metrics(ground_truth["labels"], prediction["labels"])
                all_precisions.append(precision)
                all_recalls.append(recall)
                all_f1s.append(f1)
            else:
                print(f"No predictions for image {idx}")

    # Compute average metrics
    if all_precisions and all_recalls and all_f1s:
        avg_precision = sum(all_precisions) / len(all_precisions)
        avg_recall = sum(all_recalls) / len(all_recalls)
        avg_f1 = sum(all_f1s) / len(all_f1s)

        print(f"Average Precision: {avg_precision:.4f}, Average Recall: {avg_recall:.4f}, Average F1-score: {avg_f1:.4f}")
    else:
        print("No valid predictions to calculate metrics.")

# Ensure the model and dataloader are on the same device
model.to(DEVICE)

# Evaluate the model on the test dataset
evaluate_custom(test_dataloader, model, image_processor)
