In [None]:
# -------- Block 1: Mount Google Drive --------
from google.colab import drive
drive.mount('/content/drive')

In [None]:
!pip install -U ultralytics

In [None]:
import os
import zipfile
import shutil
import random
from pathlib import Path
from ultralytics import YOLO

In [None]:
# -------- Block 4: Paths --------
DRIVE_BASE = "/content/drive/MyDrive/genai_kids_room_project"
DATA_DIR = os.path.join(DRIVE_BASE, "yolo_training_dataset")
os.makedirs(DATA_DIR, exist_ok=True)

# Archive paths (adjust names exactly as on Drive)
ARCHIVES = {
    "images_one": "one.zip",
    "images_razmetka": "razmetka.zip",
    "labels_razmetka": "task_1901954_annotations_2026_01_03_16_12_16_yolo 1.1.zip",
    "labels_one": "task_1902028_annotations_2026_01_03_16_25_15_yolo 1.1.zip"
}


In [None]:
# -------- Block 5: Extract all archives --------
for key, archive_name in ARCHIVES.items():
    archive_path = os.path.join(DRIVE_BASE, "yolo_dataset", archive_name)
    extract_path = os.path.join(DATA_DIR, key)
    os.makedirs(extract_path, exist_ok=True)
    with zipfile.ZipFile(archive_path, 'r') as zip_ref:
        zip_ref.extractall(extract_path)
    print(f"{key} extracted to {extract_path}")


In [None]:
# -------- Block 6: Combine images and labels --------
IMG_EXTS = [".png", ".jpg", ".jpeg"]

all_images = []

for key in ["one", "razmetka"]:
    img_path = images_dirs[key]
    label_path = labels_dirs[key]

    if not os.path.exists(img_path) or not os.path.exists(label_path):
        print(f"Path not found: {img_path} or {label_path}")
        continue

    for img_file in os.listdir(img_path):
        if Path(img_file).suffix.lower() in IMG_EXTS:
            txt_file = Path(img_file).stem + ".txt"
            txt_path = os.path.join(label_path, txt_file)
            if os.path.exists(txt_path):
                all_images.append({
                    "img": os.path.join(img_path, img_file),
                    "label": txt_path
                })

print(f"Total images found: {len(all_images)}")
# Shuffle images
random.shuffle(all_images)

# Split train/val (80/20)
train_ratio = 0.8
train_count = int(len(all_images) * train_ratio)

for i, item in enumerate(all_images):
    if i < train_count:
        shutil.copy(item["img"], os.path.join(DATA_DIR, "images/train", os.path.basename(item["img"])))
        shutil.copy(item["label"], os.path.join(DATA_DIR, "labels/train", os.path.basename(item["label"])))
    else:
        shutil.copy(item["img"], os.path.join(DATA_DIR, "images/val", os.path.basename(item["img"])))
        shutil.copy(item["label"], os.path.join(DATA_DIR, "labels/val", os.path.basename(item["label"])))

print("Train/Val split done.")


In [None]:
# -------- Block 7: Combine images and labels --------
IMG_EXTS = [".png", ".jpg", ".jpeg"]

all_items = []

# Helper function to find subfolder inside label archives (CVAT exports obj_train_data)
def get_subdir(base_dir):
    subdirs = [f for f in os.listdir(base_dir) if os.path.isdir(os.path.join(base_dir, f))]
    if len(subdirs) == 1:
        return os.path.join(base_dir, subdirs[0])
    else:
        return base_dir

# Images
images_paths = {
    "one": get_subdir(os.path.join(DATA_DIR, "images_one")),
    "razmetka": get_subdir(os.path.join(DATA_DIR, "images_razmetka"))
}

# Labels
labels_paths = {
    "one": get_subdir(os.path.join(DATA_DIR, "labels_one")),
    "razmetka": get_subdir(os.path.join(DATA_DIR, "labels_razmetka"))
}

# Collect all image-label pairs
for key in ["one", "razmetka"]:
    img_folder = os.path.join(images_paths[key])
    label_folder = os.path.join(labels_paths[key], "obj_train_data")  # CVAT export folder

    if not os.path.exists(label_folder):
        label_folder = labels_paths[key]  # fallback if no obj_train_data

    for img_file in os.listdir(img_folder):
        if Path(img_file).suffix.lower() in IMG_EXTS:
            label_file = os.path.join(label_folder, Path(img_file).stem + ".txt")
            if os.path.exists(label_file):
                all_items.append({
                    "img": os.path.join(img_folder, img_file),
                    "label": label_file
                })

print(f"Total image-label pairs found: {len(all_items)}")

In [None]:
# -------- Block 8: Shuffle and split train/val --------

# Create necessary directories if they don't exist
for split in ["train", "val"]:
    os.makedirs(os.path.join(DATA_DIR, "images", split), exist_ok=True)
    os.makedirs(os.path.join(DATA_DIR, "labels", split), exist_ok=True)

random.shuffle(all_items)
train_ratio = 0.8
train_count = int(len(all_items) * train_ratio)

for i, item in enumerate(all_items):
    if i < train_count:
        shutil.copy(item["img"], os.path.join(DATA_DIR, "images/train", os.path.basename(item["img"])))
        shutil.copy(item["label"], os.path.join(DATA_DIR, "labels/train", os.path.basename(item["label"])))
    else:
        shutil.copy(item["img"], os.path.join(DATA_DIR, "images/val", os.path.basename(item["img"])))
        shutil.copy(item["label"], os.path.join(DATA_DIR, "labels/val", os.path.basename(item["label"])))

print("Train/Val split completed. Total images:", len(all_items))

In [None]:
# -------- Block 9: Create data.yaml --------
yaml_path = os.path.join(DATA_DIR, "data.yaml")
with open(yaml_path, "w") as f:
    f.write(f"""
train: {os.path.join(DATA_DIR, 'images/train')}
val: {os.path.join(DATA_DIR, 'images/val')}

nc: 4
names: ['sharp_object', 'electrical_hazard', 'chemical_danger', 'choking_hazard']
""")

print("data.yaml created at", yaml_path)

In [None]:
# -------- Block 10: Train YOLOv8 --------
model = YOLO("yolov8n.pt")  # nano model for fast start

model.train(
    data=yaml_path,
    epochs=50,
    imgsz=640,
    batch=4,
    name="kids_room_hazard",
    project=os.path.join(DRIVE_BASE, "yolo_training"),
)

In [None]:
# -------- Block: Validate trained model (Ultralytics v8.3+) --------
from ultralytics import YOLO

# Path to the best trained model
best_model_path = "/content/drive/MyDrive/genai_kids_room_project/yolo_training/kids_room_hazard/weights/best.pt"

# Load trained model
model = YOLO(best_model_path)

# Run validation
results = model.val()  # DetMetrics object

# Access metrics from results_dict
metrics_dict = results.results_dict
print("Validation Metrics:")
print(f"Precision: {metrics_dict['metrics/precision(B)']:.4f}")
print(f"Recall:    {metrics_dict['metrics/recall(B)']:.4f}")
print(f"mAP@0.5:  {metrics_dict['metrics/mAP50(B)']:.4f}")
print(f"mAP@0.5:0.95: {metrics_dict['metrics/mAP50-95(B)']:.4f}")