# Importing Json

In [None]:
from torch.utils.data import Dataset, DataLoader
from PIL import Image
import os
import json
import torch
import random
import tensorflow as tf

In [None]:
annotations_file = 'path/Train_dataset.json'

In [None]:
import json

data = json.load(open(annotations_file))

# Revisar las clases unicas del Json
unique_classes = set()
for image in data['images']:
    for obj in image['objects']:
        unique_classes.add(obj['class'])
print("Unique Classes:", unique_classes)


In [None]:
from collections import Counter

# Contar las instancias de los objetos en el json
class_counts = Counter()
for image in data["images"]:
    for obj in image["objects"]:
        class_counts[obj["class"]] += 1

class_counts

# Pocosi_24042024_100epocs

> Add blockquote



In [None]:
import torch
import torchvision
from torchvision.models.detection import fasterrcnn_resnet50_fpn, FasterRCNN_ResNet50_FPN_Weights
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from torchvision.transforms import functional as F
from torch.utils.data import Dataset, DataLoader
from torch.cuda.amp import GradScaler, autocast
from PIL import Image
import os
import json
import torchvision.transforms as transforms

os.environ['PYTORCH_CUDA_ALLOC_CONF'] = 'max_split_size_mb:128'
os.environ['PYTORCH_CUDA_ALLOC_CONF'] = 'expandable_segments:True'

annotations_file = 'path/Train_dataset.json'

def class_text_to_int(class_text):
    class_mapping = { "dump_truck": 1,"person": 2,"excavator": 3,"loader": 4,"mixer_truck": 5,"steamroller": 6}
    return class_mapping.get(class_text, None)


class CustomDataset(Dataset):
    def __init__(self, annotations_file, img_dir, transform=None):
        with open(annotations_file) as f:
            self.img_labels = json.load(f)
        self.img_dir = img_dir
        self.transform = transform if transform is not None else transforms.Compose([
            transforms.ToTensor(),  # Converts PIL images to PyTorch tensors
        ])

    def __len__(self):
        return len(self.img_labels['images'])

    def __getitem__(self, idx):
        img_path = os.path.join(self.img_dir, self.img_labels['images'][idx]['image_path'])
        image = Image.open(img_path).convert("RGB")
        # Apply transformations
        if self.transform:
            image = self.transform(image)
        objects = self.img_labels['images'][idx]['objects']
        boxes = []
        labels = []
        for obj in objects:
            xmin, ymin, xmax, ymax = obj['bbox'].values()
            boxes.append([xmin, ymin, xmax, ymax])
            labels.append(class_text_to_int(obj['class']))
        boxes = torch.as_tensor(boxes, dtype=torch.float32)
        labels = torch.as_tensor(labels, dtype=torch.int64)
        target = {"boxes": boxes, "labels": labels}
        return image, target


def get_model(num_classes):
    model = fasterrcnn_resnet50_fpn(weights=FasterRCNN_ResNet50_FPN_Weights.DEFAULT)
    in_features = model.roi_heads.box_predictor.cls_score.in_features
    model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)
    return model


def collate_fn(batch):
    return tuple(zip(*batch))


def train_one_epoch(model, optimizer, data_loader, device, epoch, accumulation_steps=4):
    model.train()
    scaler = GradScaler()
    optimizer.zero_grad()
    for i, (images, targets) in enumerate(data_loader):
        images = [image.to(device) for image in images]
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]
        with autocast():
            loss_dict = model(images, targets)
            losses = sum(loss for loss in loss_dict.values()) / accumulation_steps
        scaler.scale(losses).backward()
        if (i + 1) % accumulation_steps == 0 or (i + 1) == len(data_loader):
            scaler.step(optimizer)
            scaler.update()
            optimizer.zero_grad()
    print(f"Epoch #{epoch} perdida: {losses.item()}")

def main():
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    num_classes = 7
    dataset = CustomDataset(annotations_file, 'path/Images_etiquetadas', transform=None)

    data_loader = DataLoader(dataset, batch_size=24, shuffle=True, collate_fn=collate_fn, num_workers=20, pin_memory=True)

    model = get_model(num_classes).to(device)
    optimizer = torch.optim.SGD(model.parameters(), lr=0.005, momentum=0.9)
    #optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

    num_epochs = 100
    for epoch in range(num_epochs):
        train_one_epoch(model, optimizer, data_loader, device, epoch)
        torch.save(model.state_dict(), f'path/model_epoch_{epoch}.pth',)

if __name__ == "__main__":
    main()


# Validation Section

In [None]:
## Evaluación de modelos del conjunto de datos de prueba

import torch
import torchvision
from torchvision.models.detection import fasterrcnn_resnet50_fpn, FasterRCNN_ResNet50_FPN_Weights
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from torchvision.transforms import functional as F
from torch.utils.data import Dataset, DataLoader
from PIL import Image
import os
import json
import torchvision.transforms as transforms
import numpy as np


def collate_fn(batch):
    return tuple(zip(*batch))

def class_text_to_int(class_text):
    class_mapping = {
        "dump_truck": 1,
        "person": 2,
        "excavator": 3,
        "loader": 4,
        "mixer_truck": 5,
        "steamroller": 6
    }
    return class_mapping.get(class_text, None)

class CustomDataset(Dataset):
    def __init__(self, annotations_file, img_dir, transform=None):
        with open(annotations_file) as f:
            self.img_labels = json.load(f)
        self.img_dir = img_dir
        self.transform = transform if transform is not None else transforms.Compose([
            transforms.ToTensor(),
        ])

    def __len__(self):
        return len(self.img_labels['images'])

    def __getitem__(self, idx):
        img_info = self.img_labels['images'][idx]
        img_path = os.path.join(self.img_dir, img_info['image_path'])
        image = Image.open(img_path).convert("RGB")

        if self.transform:
            image = self.transform(image)

        boxes = []
        labels = []
        for obj in img_info['objects']:
            bbox = obj['bbox']
            boxes.append([bbox['x_min'], bbox['y_min'], bbox['x_max'], bbox['y_max']])
            labels.append(class_text_to_int(obj['class']))

        boxes = torch.as_tensor(boxes, dtype=torch.float32)
        labels = torch.as_tensor(labels, dtype=torch.int64)
        target = {"boxes": boxes, "labels": labels}

        return image, target


def bbox_iou(box1, box2):
    x1 = max(box1[0], box2[0])
    y1 = max(box1[1], box2[1])
    x2 = min(box1[2], box2[2])
    y2 = min(box1[3], box2[3])
    inter_area = max(x2 - x1, 0) * max(y2 - y1, 0)
    box1_area = (box1[2] - box1[0]) * (box1[3] - box1[1])
    box2_area = (box2[2] - box2[0]) * (box2[3] - box2[1])
    iou = inter_area / float(box1_area + box2_area - inter_area)
    return iou

def get_model(num_classes):
    model = fasterrcnn_resnet50_fpn(weights=FasterRCNN_ResNet50_FPN_Weights.DEFAULT)
    in_features = model.roi_heads.box_predictor.cls_score.in_features
    model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)
    return model

# Define the device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

Pocosi_24042024_30epocs = 'path/model.pth'
model = get_model(num_classes=7)
model.load_state_dict(torch.load(Pocosi_24042024_30epocs, map_location=device))
model = model.to(device)
model.eval()

# Define your test dataset here
test_annotations_file = 'path/test_dataset.json'
images_path = 'path/Images_etiquetadas'
test_dataset = CustomDataset(test_annotations_file, images_path)
test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False, collate_fn=collate_fn)

ious = []
true_positives, false_positives, false_negatives = 0, 0, 0
iou_threshold = 0.7

for images, targets in test_loader:
    images = [image.to(device) for image in images]
    outputs = model(images)
    for i, output in enumerate(outputs):
        predicted_boxes = output['boxes'].data.cpu().numpy()
        predicted_labels = output['labels'].data.cpu().numpy()
        target_boxes = targets[i]['boxes'].data.cpu().numpy()
        target_labels = targets[i]['labels'].data.cpu().numpy()

        matched_gt = set()
        for pb_idx, predicted_box in enumerate(predicted_boxes):
            for tb_idx, target_box in enumerate(target_boxes):
                iou = bbox_iou(predicted_box, target_box)
                if iou > iou_threshold and tb_idx not in matched_gt:
                    matched_gt.add(tb_idx)
                    ious.append(iou)
                    if predicted_labels[pb_idx] == target_labels[tb_idx]:
                        true_positives += 1
                    else:
                        false_positives += 1
                    break
            else:
                false_positives += 1
        false_negatives += len(target_boxes) - len(matched_gt)

average_iou = sum(ious) / len(ious) if ious else 0
precision = true_positives / (true_positives + false_positives) if true_positives + false_positives > 0 else 0
recall = true_positives / (true_positives + false_negatives) if true_positives + false_negatives > 0 else 0

print(f"Average IoU: {average_iou:.2f}")
print(f"Precision: {precision:.2f}")
print(f"Recall: {recall:.2f}")


In [None]:
import matplotlib.pyplot as plt
import matplotlib.patches as patches

# Función para graficar las imágenes
def draw_boxes(ax, boxes, labels, color):
    for box, label in zip(boxes, labels):
        x1, y1, x2, y2 = box
        rect = patches.Rectangle((x1, y1), x2 - x1, y2 - y1, linewidth=2, edgecolor=color, facecolor='none')
        ax.add_patch(rect)
        ax.text(x1, y1, label, verticalalignment='bottom', horizontalalignment='left', color=color, fontsize=12)

# Figura de visualización
fig, axes = plt.subplots(20, 1, figsize=(30, 50))  # 1 row, 5 columns
image_counter = 0

test_annotations_file = 'path/test_dataset.json'


# Carga datos
test_dataset = CustomDataset(test_annotations_file, images_path, transform=None)
test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False, collate_fn=collate_fn)

for images, targets in test_loader:
    images = [image.to(device) for image in images]
    outputs = model(images)

    for i, image in enumerate(images):
        if image_counter >= 20:
            break

        ax = axes[image_counter]


        img = F.to_pil_image(image.cpu())
        ax.imshow(img)

        # Cajas reales
        target_boxes = targets[i]['boxes'].cpu().numpy()
        target_labels = [str(label) for label in targets[i]['labels'].cpu().numpy()]
        draw_boxes(ax, target_boxes, target_labels, 'blue')

        # Cajas predichas
        predicted_boxes = outputs[i]['boxes'].detach().cpu().numpy()
        predicted_scores = outputs[i]['scores'].detach().cpu().numpy()
        predicted_labels = [str(label) for label in outputs[i]['labels'].detach().cpu().numpy()]


        high_score_idxs = [idx for idx, score in enumerate(predicted_scores) if score > 0.7]
        predicted_boxes = predicted_boxes[high_score_idxs]
        predicted_labels = [predicted_labels[idx] for idx in high_score_idxs]

        draw_boxes(ax, predicted_boxes, predicted_labels, 'red')

        ax.axis('off')
        image_counter += 1

    if image_counter >= 20:
        break

plt.tight_layout()
plt.show()

'''
 "dump_truck": 1,
        "person": 2,
        "excavator": 3,
        "loader": 4,
        "mixer_truck": 5,
        "streamroller : 6'''