In [9]:
import os
import json
import cv2
import hashlib
from tqdm import tqdm
from pathlib import Path
from collections import defaultdict
import warnings
warnings.filterwarnings("ignore", category=UserWarning)

# Set directory //  Convert JSON ‚Üí YOLO TXT 

In [None]:
# Set class map
CLASS_MAP = {
    "container": 0,
    "truck": 1
}

# Set up directories
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
SPLITS = ["train_container", "validation_container"]
OUTPUT_DIR = os.path.join(BASE_DIR, "datasets_container", "yolo_dataset")

# Create output directories
for split in SPLITS:
    split_output_dir = os.path.join(OUTPUT_DIR, split)
    os.makedirs(os.path.join(split_output_dir, "images"), exist_ok=True)
    os.makedirs(os.path.join(split_output_dir, "labels"), exist_ok=True)


def get_file_hash(filepath):
    """
    Calculate MD5 hash of file to detect true duplicates
    """
    hash_md5 = hashlib.md5()
    try:
        with open(filepath, "rb") as f:
            for chunk in iter(lambda: f.read(4096), b""):
                hash_md5.update(chunk)
        return hash_md5.hexdigest()
    except:
        return None


def convert_json_to_yolo(json_path, img_path, label_output):
    """
    Convert JSON to YOLO format - supports both bbox and centre/size formats
    """
    try:
        with open(json_path) as f:
            data = json.load(f)
        
        img = cv2.imread(img_path)
        if img is None:
            print(f"‚ùå Cannot read image: {img_path}")
            return False
            
        h, w, _ = img.shape
        yolo_lines = []
        
        for obj in data.get("labels", []):
            label = obj.get("label_class")
            
            if label not in CLASS_MAP:
                continue
            
            cls_id = CLASS_MAP[label]
            
            # Support both formats
            if "centre" in obj and "size" in obj:
                cx = obj["centre"]["x"] / w
                cy = obj["centre"]["y"] / h
                bw = obj["size"]["x"] / w
                bh = obj["size"]["y"] / h
            elif "bbox" in obj:
                bbox = obj["bbox"]
                cx = (bbox[0] + bbox[2] / 2) / w
                cy = (bbox[1] + bbox[3] / 2) / h
                bw = bbox[2] / w
                bh = bbox[3] / h
            else:
                continue
            
            # Validate coordinates
            if not (0 <= cx <= 1 and 0 <= cy <= 1 and 0 < bw <= 1 and 0 < bh <= 1):
                print(f"‚ö†Ô∏è  Invalid coordinates in {json_path}")
                continue
                
            yolo_lines.append(f"{cls_id} {cx:.6f} {cy:.6f} {bw:.6f} {bh:.6f}")
        
        # Write YOLO format
        with open(label_output, "w") as f:
            f.write("\n".join(yolo_lines))
        
        return True
        
    except Exception as e:
        print(f"‚ùå Error converting {json_path}: {e}")
        return False


# ========================================
# DUPLICATE DETECTION & PREVENTION
# ========================================

print("\n" + "="*60)
print("üîç DUPLICATE DETECTION & CONVERSION")
print("="*60)

stats = {
    'total': 0,
    'success': 0,
    'skipped_exists': 0,
    'skipped_duplicate': 0,
    'failed': 0
}

# Track processed images by hash to detect true duplicates
processed_hashes = defaultdict(list)  # hash -> [list of filenames]
processed_filenames = set()  # Track filenames to detect name collisions

for split in SPLITS:
    print(f"\nüìÇ Processing split: {split}")
    
    input_dir = os.path.join(BASE_DIR, split)
    
    if not os.path.exists(input_dir):
        print(f"‚ö†Ô∏è  Input directory not found: {input_dir}")
        continue
    
    json_files = [f for f in os.listdir(input_dir) if f.endswith(".json")]
    print(f"üìä Found {len(json_files)} JSON files")
    
    for file in tqdm(json_files, desc=f"Converting {split}"):
        stats['total'] += 1
        
        try:
            json_path = os.path.join(input_dir, file)
            
            with open(json_path) as f:
                meta = json.load(f)
            
            img_name = meta.get("image_filename")
            if not img_name:
                print(f"‚ùå No 'image_filename' in {file}")
                stats['failed'] += 1
                continue
            
            img_path = os.path.join(input_dir, img_name)
            out_img = os.path.join(OUTPUT_DIR, split, "images", img_name)
            label_name = Path(img_name).stem + ".txt"
            out_lbl = os.path.join(OUTPUT_DIR, split, "labels", label_name)
            
            # ============================================
            # CHECK 1: Already processed (skip if exists)
            # ============================================
            if os.path.exists(out_lbl):
                stats['skipped_exists'] += 1
                continue
            
            # ============================================
            # CHECK 2: Filename collision detection
            # ============================================
            if img_name in processed_filenames:
                print(f"‚ö†Ô∏è  FILENAME COLLISION: {img_name} already processed")
                stats['skipped_duplicate'] += 1
                continue
            
            # ============================================
            # CHECK 3: True duplicate detection (by hash)
            # ============================================
            if os.path.exists(img_path):
                img_hash = get_file_hash(img_path)
                
                if img_hash and img_hash in processed_hashes:
                    original_file = processed_hashes[img_hash][0]
                    print(f"üîÑ DUPLICATE IMAGE DETECTED:")
                    print(f"   Original: {original_file}")
                    print(f"   Duplicate: {img_name}")
                    stats['skipped_duplicate'] += 1
                    continue
                
                # Record this image
                processed_hashes[img_hash].append(img_name)
                processed_filenames.add(img_name)
            else:
                print(f"‚ùå Image not found: {img_path}")
                stats['failed'] += 1
                continue
            
            # ============================================
            # COPY & CONVERT (no duplicates detected)
            # ============================================
            import shutil
            shutil.copy2(img_path, out_img)
            
            if convert_json_to_yolo(json_path, img_path, out_lbl):
                stats['success'] += 1
            else:
                stats['failed'] += 1
                # Remove copied image if conversion failed
                if os.path.exists(out_img):
                    os.remove(out_img)
                
        except Exception as e:
            print(f"‚ùå Error processing {file}: {e}")
            stats['failed'] += 1

# ========================================
# SUMMARY & DUPLICATE REPORT
# ========================================
print("\n" + "="*60)
print("üìä CONVERSION SUMMARY")
print("="*60)
print(f"Total files:              {stats['total']}")
print(f"‚úÖ Success:               {stats['success']}")
print(f"‚è≠Ô∏è  Skipped (exists):      {stats['skipped_exists']}")
print(f"üîÑ Skipped (duplicates):  {stats['skipped_duplicate']}")
print(f"‚ùå Failed:                {stats['failed']}")
print("="*60)

# Report duplicate groups
duplicate_groups = {k: v for k, v in processed_hashes.items() if len(v) > 1}
if duplicate_groups:
    print(f"\n‚ö†Ô∏è  Found {len(duplicate_groups)} groups of duplicate images:")
    for hash_val, files in list(duplicate_groups.items())[:5]:  # Show first 5
        print(f"   - {files}")
    if len(duplicate_groups) > 5:
        print(f"   ... and {len(duplicate_groups) - 5} more groups")
else:
    print("\n‚úÖ No duplicate images found!")

print("="*60)

NameError: name '__file__' is not defined

# Handle NEGATIVE Images

In [None]:
"""Don't make label txt for negative images"""
NEGATIVE_DIR = os.path.join(BASE_DIR, "datasets_container", "negative")

for file in os.listdir(NEGATIVE_DIR):
    if file.endswith('.jpg'):

        src_path = os.path.join(NEGATIVE_DIR, file)
        dest_path = os.path.join(OUTPUT_DIR, "train", "images", file)

        os.system(f"cp {src_path} {dest_path}")

# Create dataset.yaml

In [None]:
dataset_yaml = f"""
path: {OUTPUT_DIR}

train: train/images
val: validation/images

nc: 2

names:
    0: container
    1: truck
"""

with open(os.path.join(OUTPUT_DIR, "dataset.yaml"), "w") as f:
    f.write(dataset_yaml)

# Train YOLOv8

In [None]:
from ultralytics import YOLO

model = YOLO("yolov8n.pt") # nano = faster model inference

results = model.train(
    data=os.path.join(OUTPUT_DIR, "dataset.yaml"),
    epochs=100,
    imgsz=640,
    batch=16,
    device=0, # Use GPU 0
    patience=15, # Early stopping if no improvement for 15 epochs
    workers=4, # Number of data loading workers
    project=os.path.join(BASE_DIR, "runs_container"), 
    name="yolov8n_container_detection"
)

# Export Best Model ‚Üí models/container_yolov8.pt

In [None]:
# Export best model to models/container_yolov8.pt

import shutil

best_model = os.path.join(BASE_DIR, "runs_container", "yolov8n_container_detection", "weights", "best.pt")
TARGET_PATH = os.path.join(BASE_DIR, "models", "container_yolov8.pt")

# Copy best model to target path
shutil.copy(best_model, TARGET_PATH)
print(f"YOLOV8 best model has been exported to {TARGET_PATH} ‚úÖ")

# Quick Validation Test

In [None]:
model = YOLO(TARGET_PATH)
img = "datasets_container/validation/3de22b94583ee0f7defd6c5a6ce439dc2610efad.jpg" # Example image path

# Perform inference
results = model.predict(source=img, conf=0.4, save=True)
results[0].plot()