In [None]:
import fiftyone as fo
import fiftyone.zoo as foz
import fiftyone.core.labels as fol
import os
import json
import shutil
from PIL import Image 
from fiftyone import ViewField as F
import time

## CREATING ENV VARIABLE FOR FIFTYONE

In [None]:
import os

# Set environment variables
os.environ['FIFTYONE_DIR'] = 'your_desired_fiftyone_directory'
os.environ['IMAGES_DIR'] = 'your_desired_images_directory'
os.environ['ANNOTATIONS_DIR'] = 'your_desired_annotations_directory'


In [None]:
fiftyone_dir = os.getenv('FIFTYONE_DIR', 'default_fiftyone_directory')
images_dir = os.getenv('IMAGES_DIR', 'default_images_directory')
annotations_dir = os.getenv('ANNOTATIONS_DIR', 'default_annotations_directory')

## DATA PREPARATION

In [None]:
train_flag = False
val_flag = False
test_flag = False

In [None]:
def normalize_bbox(bbox, image_width, image_height):
    x_min, y_min, width, height = bbox
    x_center = (x_min + width / 2) / image_width
    y_center = (y_min + height / 2) / image_height
    width /= image_width
    height /= image_height
    return x_center, y_center, width, height

def verify_file_exists(filepath):
    if not os.path.isfile(filepath):
        print(f"Warning: File not found - {filepath}")
        return False
    return True

In [None]:
for split in ['validation', 'train', 'test']:
    target_image_dir = f'./datasets/coco-human-dataset/images/{split}/images/'
    target_label_dir = f'./datasets/coco-human-dataset/images/{split}/labels/'
    target_ann_dir = f'./datasets/coco-human-dataset/images/{split}/{split}.json'
    
    os.makedirs(target_image_dir, exist_ok=True)
    os.makedirs(target_label_dir, exist_ok=True)
    
    print('Processing ' + split)

    dataset = foz.load_zoo_dataset(
        "coco-2017",
        split=split,
        label_types=["detections", "segmentations"],
        include_id=True,
        seed=42,
        dataset_name=f"{split}-custom"
    )    
    coco_annotations = {
        "images": [],
        "annotations": [],
        "categories": [
            {"id": 1, "name": "person"},
        ],
    }
    annotation_id = 1
    sample_count = 0

    if split == 'train':
        filtered_view = dataset.filter_labels("detections", F("label") == "person")
        print(f"Number of samples with 'person' labels: {len(filtered_view)}")
        
        for sample in filtered_view:
            # if sample_count >= 5000:
            #     break
            # sample_count += 1

            image_path = sample.filepath
            detections = sample.detections.detections
            
            if detections is None or len(detections) == 0:
                print(f"Skipping {image_path} - no detections")
                continue
            
            image = Image.open(image_path)
            image_width, image_height = image.size

            # Copy image to output directory
            shutil.copy(image_path, target_image_dir)
            
            # Extract numeric ID from the file name (e.g., 000000000036.jpg -> 36)
            file_name = os.path.basename(image_path)
            numeric_id = int(file_name.split('.')[0])
            
            # Add image info to COCO annotations
            coco_annotations["images"].append({
                "id": numeric_id,
                "file_name": file_name,
                "width": image_width,
                "height": image_height,
            })
            
            # Add detections to COCO annotations
            yolo_labels = []
            for detection in detections:
                if detection.label == "person":
                    bbox = detection.bounding_box  # [x_min, y_min, width, height]
                    x_center, y_center, width, height = normalize_bbox(
                        (bbox[0] * image_width, bbox[1] * image_height, bbox[2] * image_width, bbox[3] * image_height),
                        image_width, image_height
                    )
                    yolo_label = f"0 {x_center} {y_center} {width} {height}"
                    yolo_labels.append(yolo_label)
                    
                    x_min = bbox[0] * image_width
                    y_min = bbox[1] * image_height
                    coco_annotations["annotations"].append({
                        "id": annotation_id,
                        "image_id": numeric_id,
                        "category_id": 1,
                        "bbox": [x_min, y_min, bbox[2] * image_width, bbox[3] * image_height],
                        "area": bbox[2] * image_width * bbox[3] * image_height,
                        "iscrowd": 0,
                    })
                    annotation_id += 1
            
            # Write YOLO labels to file
            label_path = os.path.join(target_label_dir, file_name.replace('.jpg', '.txt'))
            with open(label_path, 'w') as f:
                f.write("\n".join(yolo_labels))
        
        # Save COCO annotations to file
        with open(target_ann_dir, 'w') as f:
            json.dump(coco_annotations, f)
    
    if split == 'test':
        for sample in dataset:
            # if sample_count >= 5000:
            #     break
            # sample_count += 1

            image_path = sample.filepath
            shutil.copy(image_path, target_image_dir)
    
    if split == 'validation':
        images_dir = os.path.join(fiftyone_dir, 'validation', 'data')
        annotations_path = os.path.join(fiftyone_dir, 'raw', 'instances_val2017.json')
        with open(annotations_path, "r") as f:
            coco_data = json.load(f)

        image_id_to_filename = {img["id"]: img["file_name"] for img in coco_data["images"]}
        person_annotations = [ann for ann in coco_data["annotations"] if ann["category_id"] == 1]
        person_image_ids = set(ann["image_id"] for ann in person_annotations)

        for image_id in person_image_ids:
            # if sample_count >= 5000:
            #     break
            # sample_count += 1

            image_filename = image_id_to_filename[image_id]
            image_path = os.path.join(images_dir, image_filename)
            
            if not verify_file_exists(image_path):
                continue
            
            shutil.copy(image_path, target_image_dir)
            
            # Get annotations for this image
            annotations = [ann for ann in person_annotations if ann["image_id"] == image_id]
            
            # Prepare YOLO format labels
            yolo_labels = []
            for ann in annotations:
                bbox = ann["bbox"]  # [x_min, y_min, width, height]
                x_center, y_center, width, height = normalize_bbox(
                    bbox, coco_data['images'][0]['width'], coco_data['images'][0]['height']
                )
                yolo_label = f"0 {x_center} {y_center} {width} {height}"
                yolo_labels.append(yolo_label)
            
            # Write YOLO labels to file
            label_path = os.path.join(target_label_dir, image_filename.replace('.jpg', '.txt'))
            with open(label_path, 'w') as f:
                f.write("\n".join(yolo_labels))
            
            # Add image info to COCO annotations
            coco_annotations["images"].append({
                "id": image_id,
                "file_name": image_filename,
                "width": coco_data['images'][0]['width'],
                "height": coco_data['images'][0]['height'],
            })
            
            # Add detections to COCO annotations
            for ann in annotations:
                bbox = ann["bbox"]
                x_min = bbox[0]
                y_min = bbox[1]
                width = bbox[2]
                height = bbox[3]
                coco_annotations["annotations"].append({
                    "id": annotation_id,
                    "image_id": image_id,
                    "category_id": 1,
                    "bbox": [x_min, y_min, width, height],
                    "area": width * height,
                    "iscrowd": 0,
                })
                annotation_id += 1
        
        # Save COCO annotations for Faster R-CNN
        with open(target_ann_dir, 'w') as f:
            json.dump(coco_annotations, f)

        print("Filtered validation images and labels have been processed.")

## YOLO

In [None]:
from ultralytics import YOLO
import torch

In [None]:
print("CUDA available:", torch.cuda.is_available())
if torch.cuda.is_available():
    print("CUDA device count:", torch.cuda.device_count())
    print("Current CUDA device:", torch.cuda.current_device())
    print("CUDA device name:", torch.cuda.get_device_name(torch.cuda.current_device()))
else:
    print("CUDA is not available. Please check your CUDA and PyTorch installation.")

In [None]:
model_yolo = YOLO("yolov9c.yaml")

In [None]:
results = model_yolo.train(data="C:/Users/azerr/AIFI/HumanDetectionAI/dataset.yaml", epochs=1, imgsz=640, device='0')

In [None]:
model_yolo.save("trained_yolov9c_weights.pt")

## Faster RCNN

In [None]:
import torch
from torch.utils.data import DataLoader
import torchvision
import torchvision.transforms as T
from torchvision.models.detection import FasterRCNN , fasterrcnn_resnet50_fpn
from torchvision.models.detection.backbone_utils import resnet_fpn_backbone
from torchvision.datasets import CocoDetection
from torch.utils.data import DataLoader
from torchvision.datasets import CocoDetection
from torchvision.transforms import functional as F
import matplotlib.pyplot as plt
from PIL import Image


In [None]:
import sys
detection_module_path = ''
sys.path.append(detection_module_path)

In [None]:
from engine import utils, evaluate

In [None]:
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
num_epochs = 10
batch_size = 1
picture_range = 10

In [None]:
# Load the pre-trained model
model = fasterrcnn_resnet50_fpn(pretrained=True)
model.eval()  # Set the model to evaluation mode

# Move the model to the GPU if available
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
model.to(device)

# COCO dataset class names
COCO_INSTANCE_CATEGORY_NAMES = [
    '__background__', 'person', 'bicycle', 'car', 'motorcycle', 'airplane', 'bus',
    'train', 'truck', 'boat', 'traffic light', 'fire hydrant', 'N/A', 'stop sign',
    'parking meter', 'bench', 'bird', 'cat', 'dog', 'horse', 'sheep', 'cow',
    'elephant', 'bear', 'zebra', 'giraffe', 'N/A', 'backpack', 'umbrella', 'N/A',
    'N/A', 'handbag', 'tie', 'suitcase', 'frisbee', 'skis', 'snowboard', 'sports ball',
    'kite', 'baseball bat', 'baseball glove', 'skateboard', 'surfboard', 'tennis racket',
    'bottle', 'N/A', 'wine glass', 'cup', 'fork', 'knife', 'spoon', 'bowl', 'banana',
    'apple', 'sandwich', 'orange', 'broccoli', 'carrot', 'hot dog', 'pizza', 'donut',
    'cake', 'chair', 'couch', 'potted plant', 'bed', 'N/A', 'dining table', 'N/A',
    'N/A', 'toilet', 'N/A', 'tv', 'laptop', 'mouse', 'remote', 'keyboard', 'cell phone',
    'microwave', 'oven', 'toaster', 'sink', 'refrigerator', 'N/A', 'book', 'clock',
    'vase', 'scissors', 'teddy bear', 'hair drier', 'toothbrush'
]

# Function to get predictions for a single image
def get_person_detections(image, threshold=0.5):
    image_tensor = F.to_tensor(image).unsqueeze(0).to(device)
    with torch.no_grad():
        predictions = model(image_tensor)
    
    pred_classes = [COCO_INSTANCE_CATEGORY_NAMES[i] for i in predictions[0]['labels'].cpu().numpy()]
    pred_boxes = [[(i[0], i[1]), (i[2], i[3])] for i in predictions[0]['boxes'].cpu().numpy()]
    pred_scores = predictions[0]['scores'].cpu().numpy()

    person_boxes = [box for idx, box in enumerate(pred_boxes) if pred_classes[idx] == 'person' and pred_scores[idx] > threshold]
    return person_boxes

# Function to plot the bounding boxes
def plot_detections(image, boxes):
    plt.figure(figsize=(12, 8))
    plt.imshow(image)
    ax = plt.gca()
    for box in boxes:
        rect = plt.Rectangle(box[0], box[1][0] - box[0][0], box[1][1] - box[0][1], fill=False, color='red')
        ax.add_patch(rect)
    plt.show()

# Load and display an image
image_path = r""
image = Image.open(image_path).convert('RGB')

# Get person detections
person_boxes = get_person_detections(image, threshold=0.5)

# Plot the detections
plot_detections(image, person_boxes)


In [None]:
import torch
from torchvision.models.detection import fasterrcnn_resnet50_fpn
from torchvision.transforms import functional as F
from PIL import Image
import json
from pycocotools.coco import COCO
from pycocotools.cocoeval import COCOeval
import numpy as np
import time

# Load the pre-trained Faster R-CNN model
model = fasterrcnn_resnet50_fpn(pretrained=True)
model.eval()  # Set the model to evaluation mode

# Move the model to the GPU if available
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
model.to(device)

# Function to get predictions for a single image
def get_predictions(image):
    image_tensor = F.to_tensor(image).unsqueeze(0).to(device)
    with torch.no_grad():
        predictions = model(image_tensor)
    return predictions

# Function to convert predictions to COCO format
def convert_to_coco_format(predictions, image_id):
    coco_predictions = []
    for i, box in enumerate(predictions[0]['boxes']):
        xmin, ymin, xmax, ymax = box
        width, height = xmax - xmin, ymax - ymin
        score = predictions[0]['scores'][i].item()
        category_id = predictions[0]['labels'][i].item()
        coco_predictions.append({
            "image_id": image_id,
            "category_id": category_id,
            "bbox": [xmin.item(), ymin.item(), width.item(), height.item()],
            "score": score
        })
    return coco_predictions

# Load COCO annotations
coco_gt = COCO(r"C:\Users\azerr\AIFI\HumanDetectionAI\datasets\coco-human-dataset\images\val\validation.json")

# Get image ids from COCO
image_ids = coco_gt.getImgIds()

# Run inference and collect predictions
all_predictions = []
start_time = time.time()
for image_id in image_ids:
    image_info = coco_gt.loadImgs(image_id)[0]
    image_path = r"C:\Users\azerr\AIFI\HumanDetectionAI\datasets\coco-human-dataset\images\val\images\\" + "\\" + image_info['file_name']
    image = Image.open(image_path).convert('RGB')
    predictions = get_predictions(image)
    coco_predictions = convert_to_coco_format(predictions, image_id)
    all_predictions.extend(coco_predictions)

# Save predictions to a JSON file
with open('predictions.json', 'w') as f:
    json.dump(all_predictions, f)

# Load predictions
coco_dt = coco_gt.loadRes('predictions.json')

# Initialize COCOeval object
coco_eval = COCOeval(coco_gt, coco_dt, iouType='bbox')

# Evaluate
coco_eval.evaluate()
coco_eval.accumulate()
coco_eval.summarize()

print(f'Inference time: {time.time() - start_time:.2f} seconds')


In [None]:
data_transforms = T.Compose([
    T.RandomHorizontalFlip(0.5),
    T.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.2),
    T.ToTensor()
])

train_dataset = CocoDetection(
    root='./datasets/coco-human-dataset/images/train/images',
    annFile='./datasets/coco-human-dataset/images/train/train.json',
    transform=data_transforms
)

In [None]:
train_loader = DataLoader(
    train_dataset,
    batch_size=2,
    shuffle=True,
    collate_fn=utils.collate_fn
)

In [None]:
val_dataset = CocoDetection(
    root='./datasets/coco-human-dataset/images/val/images',
    annFile='./datasets/coco-human-dataset/images/val/validation.json',
    transform=data_transforms
)
val_loader = DataLoader(
    val_dataset,
    batch_size=2,
    shuffle=True,
    collate_fn=utils.collate_fn
)

In [None]:
model = fasterrcnn_resnet50_fpn(pretrained=True)
model.eval()  # Set the model to evaluation mode

# Move the model to the GPU if available
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
model.to(device)

# Optimizer and Learning Rate Scheduler
params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(params, lr=0.001, momentum=0.9, weight_decay=0.0005)
lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.1)

# Function to filter invalid bounding boxes
def filter_invalid_boxes(target, device):
    valid_boxes = []
    valid_labels = []
    valid_area = []
    valid_iscrowd = []
    
    for i, box in enumerate(target['boxes']):
        xmin, ymin, xmax, ymax = box
        if xmax > xmin and ymax > ymin:  # Ensure positive width and height
            valid_boxes.append(box)
            valid_labels.append(target['labels'][i])
            valid_area.append(target['area'][i])
            valid_iscrowd.append(target['iscrowd'][i])
    
    target['boxes'] = torch.stack(valid_boxes).to(device)
    target['labels'] = torch.as_tensor(valid_labels, dtype=torch.int64).to(device)
    target['area'] = torch.as_tensor(valid_area, dtype=torch.float32).to(device)
    target['iscrowd'] = torch.as_tensor(valid_iscrowd, dtype=torch.int64).to(device)
    
    return target


# Training Loop
num_epochs = 10

In [None]:
import torch
from pycocotools.coco import COCO
from pycocotools.cocoeval import COCOeval
from torchvision.models.detection import fasterrcnn_resnet50_fpn
from torch.utils.data import DataLoader
from torchvision.datasets import CocoDetection
import torchvision.transforms as T

def cust_evaluate(model, data_loader, device):
    model.eval()
    coco = COCO('./datasets/coco-human-dataset/images/val/validation.json')
    coco_evaluator = COCOeval(coco, coco, 'bbox')

    cpu_device = torch.device("cpu")
    results = []
    
    for images, targets in data_loader:
        images = list(img.to(device) for img in images)
        with torch.no_grad():
            outputs = model(images)
        outputs = [{k: v.to(cpu_device) for k, v in t.items()} for t in outputs]

        for target_list, output in zip(targets, outputs):
            for target in target_list:
                image_id = target["image_id"]
                for i in range(len(output["boxes"])):
                    box = output["boxes"][i].tolist()
                    score = output["scores"][i].item()
                    category_id = output["labels"][i].item()
                    result = {
                        "image_id": image_id,
                        "category_id": category_id,
                        "bbox": [box[0], box[1], box[2] - box[0], box[3] - box[1]],
                        "score": score,
                    }
                    results.append(result)

    # Load results into COCOeval
    coco_results = coco.loadRes(results)
    coco_evaluator = COCOeval(coco, coco_results, 'bbox')

    coco_evaluator.evaluate()
    coco_evaluator.accumulate()
    coco_evaluator.summarize()


In [None]:
for epoch in range(num_epochs):
    print('Epoch Starting')
    with torch.inference_mode():
        model.train()  # Set to training mode
        for images, targets in train_loader:
            images = [image.to(device) for image in images]
            
            # Modify targets to fit the expected format
            formatted_targets = []
            for t in targets:
                boxes = []
                labels = []
                area = []
                iscrowd = []
    
                for obj in t:
                    xmin, ymin, width, height = obj['bbox']
                    boxes.append([xmin, ymin, xmin + width, ymin + height])
                    labels.append(obj['category_id'])
                    area.append(obj['area'])
                    iscrowd.append(obj['iscrowd'])
    
                boxes = torch.as_tensor(boxes, dtype=torch.float32) # reccomended float16
                labels = torch.as_tensor(labels, dtype=torch.int64) # reccomended float32
                area = torch.as_tensor(area, dtype=torch.float32)
                iscrowd = torch.as_tensor(iscrowd, dtype=torch.int64)
                image_id = torch.tensor([t[0]['image_id']], dtype=torch.int64)
    
                formatted_target = {
                    'boxes': boxes,
                    'labels': labels,
                    'area': area,
                    'iscrowd': iscrowd,
                    'image_id': image_id
                }
                
                # Filter out invalid boxes
                formatted_target = filter_invalid_boxes(formatted_target, device)
                formatted_targets.append(formatted_target)
    
            # Forward pass
            loss_dict = model(images, formatted_targets)
            losses = sum(loss for loss in loss_dict.values())
    
            # Backward pass
            optimizer.zero_grad()
            losses.backward()
            optimizer.step()
    
        lr_scheduler.step()
    
        # Evaluate model after each epoch
        cust_evaluate(model, val_loader, device=device)
        print('Epoch Completed')
        print('==========================')
print("Training complete!")
torch.save(model.state_dict(), 'trained_yolov9c_weights.pt')


In [None]:
model.eval()  # Set the model to evaluation mode

# COCO dataset class names
COCO_INSTANCE_CATEGORY_NAMES = [
    '__background__', 'person', 'bicycle', 'car', 'motorcycle', 'airplane', 'bus',
    'train', 'truck', 'boat', 'traffic light', 'fire hydrant', 'N/A', 'stop sign',
    'parking meter', 'bench', 'bird', 'cat', 'dog', 'horse', 'sheep', 'cow',
    'elephant', 'bear', 'zebra', 'giraffe', 'N/A', 'backpack', 'umbrella', 'N/A',
    'N/A', 'handbag', 'tie', 'suitcase', 'frisbee', 'skis', 'snowboard', 'sports ball',
    'kite', 'baseball bat', 'baseball glove', 'skateboard', 'surfboard', 'tennis racket',
    'bottle', 'N/A', 'wine glass', 'cup', 'fork', 'knife', 'spoon', 'bowl', 'banana',
    'apple', 'sandwich', 'orange', 'broccoli', 'carrot', 'hot dog', 'pizza', 'donut',
    'cake', 'chair', 'couch', 'potted plant', 'bed', 'N/A', 'dining table', 'N/A',
    'N/A', 'toilet', 'N/A', 'tv', 'laptop', 'mouse', 'remote', 'keyboard', 'cell phone',
    'microwave', 'oven', 'toaster', 'sink', 'refrigerator', 'N/A', 'book', 'clock',
    'vase', 'scissors', 'teddy bear', 'hair drier', 'toothbrush'
]

# Function to get predictions for a single image
def get_person_detections(image, threshold=0.5):
    image_tensor = F.to_tensor(image).unsqueeze(0).to(device)
    with torch.no_grad():
        predictions = model(image_tensor)
    
    pred_classes = [COCO_INSTANCE_CATEGORY_NAMES[i] for i in predictions[0]['labels'].cpu().numpy()]
    pred_boxes = [[(i[0], i[1]), (i[2], i[3])] for i in predictions[0]['boxes'].cpu().numpy()]
    pred_scores = predictions[0]['scores'].cpu().numpy()

    person_boxes = [box for idx, box in enumerate(pred_boxes) if pred_classes[idx] == 'person' and pred_scores[idx] > threshold]
    return person_boxes

# Function to plot the bounding boxes
def plot_detections(image, boxes):
    plt.figure(figsize=(12, 8))
    plt.imshow(image)
    ax = plt.gca()
    for box in boxes:
        rect = plt.Rectangle(box[0], box[1][0] - box[0][0], box[1][1] - box[0][1], fill=False, color='red')
        ax.add_patch(rect)
    plt.show()

# Load and display an image
image_path = r"C:\Users\azerr\Downloads\6237879203_f785d06a9a_b.jpg"
image = Image.open(image_path).convert('RGB')

# Get person detections
person_boxes = get_person_detections(image, threshold=0.5)

# Plot the detections
plot_detections(image, person_boxes)