# Kaggle Stuff

## Setup: Install kagglehub
This cell installs the `kagglehub` helper used to fetch datasets directly into the notebook environment. Run this first to ensure the download cells work.

In [None]:
%pip install kagglehub

In [None]:
# Centralized imports for the notebook (moved here to avoid duplicates)
import os
import json
import csv
import random
import re
from pathlib import Path
from PIL import Image, ImageOps
import numpy as np
import matplotlib.pyplot as plt
import kagglehub

## Download traffic sign classification dataset
This cell downloads the traffic sign classification dataset. It will print the path where the dataset was stored locally.

In [None]:
# Download latest version
path = kagglehub.dataset_download("ahemateja19bec1025/traffic-sign-dataset-classification")

print("Path to dataset files:", path)

## Download car detection dataset
This cell downloads a second dataset (car detection). We fetch both datasets so we can later combine relevant images for training/analysis.

In [None]:

# Download latest version
path = kagglehub.dataset_download("pkdarabi/cardetection")

print("Path to dataset files:", path)

# Dataset 2

## Build mapping for Dataset 2 (image path -> human-readable label)
This block reads the provided CSV label file and constructs a dictionary mapping absolute image paths to their human-readable labels. It handles both training folders (labeled by subdirectory) and the TEST folder (filenames contain class id).

In [None]:
# Build a dictionary mapping absolute image path -> human-readable label

root = Path(r"dataset2/traffic_Data")
labels_csv = Path(r"dataset2/labels.csv")

# Load label mapping (ClassId -> Name)
label_map = {}
with labels_csv.open(newline='', encoding='utf-8') as f:
    reader = csv.DictReader(f)
    for row in reader:
        cid = str(row['ClassId']).strip()
        label_map[cid] = row['Name'].strip()

# Helper to extract class id from test filename (before first underscore)
def extract_test_class_id(filename: str) -> str:
    first = filename.split('_', 1)[0]
    # Remove leading zeros consistently (keep original mapping style if needed)
    # Our labels.csv uses non-padded integers, so normalize.
    try:
        return str(int(first))
    except ValueError:
        return first  # fallback

data_label_map = {}

# Process training data (DATA folder: subdirectories named by class id)
train_dir = root / 'DATA'
if train_dir.exists():
    for class_dir in sorted([d for d in train_dir.iterdir() if d.is_dir()]):
        class_id = class_dir.name  # e.g. '0', '1', '10', ...
        human_label = label_map.get(class_id, f"<UNKNOWN:{class_id}>")
        for img_path in class_dir.rglob('*'):
            if img_path.is_file() and img_path.suffix.lower() in {'.png', '.jpg', '.jpeg'}:
                data_label_map[str(img_path.resolve())] = human_label

# Process test data (files in TEST folder; first token before '_' is zero-padded class id)
test_dir = root / 'TEST'
if test_dir.exists():
    for img_path in sorted([p for p in test_dir.iterdir() if p.is_file()]):
        class_id_norm = extract_test_class_id(img_path.name)
        human_label = label_map.get(class_id_norm, f"<UNKNOWN:{class_id_norm}>")
        data_label_map[str(img_path.resolve())] = human_label

print(f"Total datapoints collected: {len(data_label_map)}")
# Show a small sample
sample_items = list(data_label_map.items())[:10]
for p, lbl in sample_items:
    print(p, '->', lbl)

# Optionally keep the dictionary for later use
all_data_label_map = data_label_map

## Clean: remove unknown labels
Filter out any examples whose label begins with `Unknown` to remove unlabeled or suspect data points.

In [None]:
#throw away all data with labels starting with Unknown (exact caps)
all_data_label_map = {p: lbl for p, lbl in all_data_label_map.items() if not lbl.startswith("Unknown")}
print(f"Total datapoints after cleaning: {len(all_data_label_map)}")

## Save dataset2 mappings to JSON
Persist both the image->label mapping and the class-id->name mapping so downstream cells and scripts can reuse them without recomputing.

In [None]:
#save as a json file

with open('dataset2_labels.json', 'w', encoding='utf-8') as f:
    json.dump(all_data_label_map, f, ensure_ascii=False, indent=4)

#also go to the dataset2/labels.csv where we have ClassId,Name -> and save another json file based on our label_map
file_path = 'dataset2/labels.csv'
label_map = {}
with open(file_path, 'r', encoding='utf-8') as f:
    reader = csv.DictReader(f)
    for row in reader:
        cid = str(row['ClassId']).strip()
        label_map[cid] = row['Name'].strip()
with open('label_map.json', 'w', encoding='utf-8') as f:
    json.dump(label_map, f, ensure_ascii=False, indent=4)

## EDA: image dimension distributions (Dataset 2)
Sample a subset of images and plot width/height distributions to decide a target resolution for processing.

In [None]:
#do an eda o the dimensions of all images, show some stats and plots

with open('dataset2_labels.json', 'r', encoding='utf-8') as f:
    all_data_label_map = json.load(f)
    
sample_paths = random.sample(list(all_data_label_map.keys()), 5000)
dimensions = []
for path in sample_paths:
    with Image.open(path) as img:
        dimensions.append(img.size)  # (width, height)
widths, heights = zip(*dimensions)
plt.figure(figsize=(12, 5))
plt.subplot(1, 2, 1)
plt.hist(widths, bins=20, color='blue', alpha=0.7)
plt.title('Image Width Distribution')
plt.xlabel('Width (pixels)')
plt.ylabel('Frequency')
plt.subplot(1, 2, 2)
plt.hist(heights, bins=20, color='green', alpha=0.7)
plt.title('Image Height Distribution')
plt.xlabel('Height (pixels)')
plt.ylabel('Frequency')
plt.tight_layout()
plt.show()

* Here it seems that either 128x128 or 256*256 is used.
* Let's use 256x256 so that we don't lose details.

In [None]:
# Rescale all dataset2 images to 256x256

TARGET_SIZE = 256

# Load current labels
with open('dataset2_labels.json', 'r', encoding='utf-8') as f:
    dataset2_labels = json.load(f)

output_dir = Path(r"dataset2_processed")
output_dir.mkdir(parents=True, exist_ok=True)

def process_image(img: Image.Image, target_size: int = 256) -> Image.Image:
    """
    Resize image to target_size x target_size.
    - If larger: center crop
    - If smaller: pad with reflection
    - If one dimension matches, handle accordingly
    """
    width, height = img.size
    
    if width == target_size and height == target_size:
        return img.copy()
    
    # If image is larger in both dimensions, center crop
    if width >= target_size and height >= target_size:
        left = (width - target_size) // 2
        top = (height - target_size) // 2
        return img.crop((left, top, left + target_size, top + target_size))
    
    # If image is smaller in at least one dimension, we need to pad
    # First, if one dimension is larger, crop that dimension to target
    if width > target_size:
        left = (width - target_size) // 2
        img = img.crop((left, 0, left + target_size, height))
        width = target_size
    if height > target_size:
        top = (height - target_size) // 2
        img = img.crop((0, top, width, top + target_size))
        height = target_size
    
    # Now pad to target size using reflection padding
    pad_left = (target_size - width) // 2
    pad_right = target_size - width - pad_left
    pad_top = (target_size - height) // 2
    pad_bottom = target_size - height - pad_top
    
    # For reflection padding use numpy (available from top imports)
    try:
        img_array = np.array(img)
        # Pad with reflect mode
        if len(img_array.shape) == 3:
            padded = np.pad(img_array, ((pad_top, pad_bottom), (pad_left, pad_right), (0, 0)), mode='reflect')
        else:
            padded = np.pad(img_array, ((pad_top, pad_bottom), (pad_left, pad_right)), mode='reflect')
        return Image.fromarray(padded)
    except Exception:
        # Fallback: use edge expansion (less ideal but works)
        return ImageOps.expand(img, border=(pad_left, pad_top, pad_right, pad_bottom), fill='black')

# Process all images
success_count = 0
error_count = 0
updated_labels = {}

for img_path_str, label in dataset2_labels.items():
    img_path = Path(img_path_str)
    
    if not img_path.exists():
        error_count += 1
        print(f"Missing: {img_path.name}")
        continue
    
    try:
        with Image.open(img_path) as img:
            # Convert to RGB if necessary (some images might be grayscale or RGBA)
            if img.mode != 'RGB':
                img = img.convert('RGB')
            
            processed = process_image(img, TARGET_SIZE)
            
            # Save with same filename to output directory
            output_path = output_dir / img_path.name
            processed.save(output_path)
            
            # Update path in labels
            updated_labels[str(output_path.resolve())] = label
            success_count += 1
            
    except Exception as e:
        error_count += 1
        print(f"Error processing {img_path.name}: {e}")

# Save updated labels
with open('dataset2_labels.json', 'w', encoding='utf-8') as f:
    json.dump(updated_labels, f, ensure_ascii=False, indent=2)

print(f"Successfully processed: {success_count}")
print(f"Errors: {error_count}")
print(f"Output directory: {output_dir}")
print(f"Updated dataset2_labels.json with new paths")

## Verify processed image dimensions (Dataset 2)
Plot the distributions again after processing to make sure images are now uniform (256x256).

In [None]:
#plot the distribution of image dimensions in the cropped dataset2_labels.json

with open('dataset2_labels.json', 'r', encoding='utf-8') as f:
    dataset2_labels = json.load(f)
sample_paths = random.sample(list(dataset2_labels.keys()), 5000)
dimensions = []
for path in sample_paths:
    with Image.open(path) as img:
        dimensions.append(img.size)  # (width, height)
widths, heights = zip(*dimensions)
plt.figure(figsize=(12, 5))
plt.hist(widths, bins=50, alpha=0.7, label='Widths')
plt.hist(heights, bins=50, alpha=0.7, label='Heights')
plt.xlabel('Pixels')
plt.ylabel('Frequency')
plt.title('Distribution of Image Dimensions')
plt.legend()
plt.show()

# Dataset 1

## Build mapping for Dataset 1 (YOLO Roboflow dataset)
Parse `data.yaml` to extract class names and then scan YOLO label files. Only include images with a single object (single-line label files) to build an image->class mapping.

In [None]:
# Build dictionary of single-object images (absolute path -> class name) for Roboflow YOLO dataset (dataset1)

dataset1_root = Path(r"dataset1")
# Parse data.yaml manually (simple since small)
yaml_path = dataset1_root / "data.yaml"
raw_yaml = yaml_path.read_text(encoding="utf-8")
# Extract names list using regex fallback
names_match = re.search(r"names:\s*\[(.*?)\]", raw_yaml, re.DOTALL)
if not names_match:
    raise ValueError("Could not find names list in data.yaml")
# Split respecting quotes
names_part = names_match.group(1)
class_names = [n.strip().strip("'\"") for n in names_part.split(',')]

# Directories relative to data.yaml spec (they use .. relative to data.yaml location)
# Compute absolute paths for train/valid/test images & labels
train_images_dir = dataset1_root / "train" / "images"
valid_images_dir = dataset1_root / "valid" / "images"
test_images_dir  = dataset1_root / "test" / "images"
train_labels_dir = dataset1_root / "train" / "labels"
valid_labels_dir = dataset1_root / "valid" / "labels"
test_labels_dir  = dataset1_root / "test" / "labels"

def find_image_for_label(label_file: Path) -> Path | None:
    stem = label_file.stem  # YOLO: same stem
    for ext in ('.jpg', '.jpeg', '.png'):  # common
        for parent in (train_images_dir, valid_images_dir, test_images_dir):
            candidate = parent / f"{stem}{ext}"
            if candidate.exists():
                return candidate
    return None

single_object_map = {}
multiple_object_count = 0
missing_image_count = 0

def process_labels_dir(labels_dir: Path):
    global multiple_object_count, missing_image_count
    if not labels_dir.exists():
        return
    for lf in labels_dir.iterdir():
        if not lf.is_file() or lf.suffix != '.txt':
            continue
        lines = [ln.strip() for ln in lf.read_text(encoding='utf-8').splitlines() if ln.strip()]
        if len(lines) != 1:  # skip multi-object or empty
            if len(lines) > 1:
                multiple_object_count += 1
            continue
        # Single line: class_idx xc yc w h
        parts = lines[0].split()
        if not parts:
            continue
        try:
            class_idx = int(parts[0])
        except ValueError:
            continue
        if not (0 <= class_idx < len(class_names)):
            continue
        img_path = find_image_for_label(lf)
        if img_path is None:
            missing_image_count += 1
            continue
        single_object_map[str(img_path.resolve())] = class_names[class_idx]

for d in (train_labels_dir, valid_labels_dir, test_labels_dir):
    process_labels_dir(d)

print(f"Total single-object images: {len(single_object_map)}")
print(f"Skipped (multi-object) label files: {multiple_object_count}")
print(f"Missing image files: {missing_image_count}")
# Show sample
for i, (p, lbl) in enumerate(list(single_object_map.items())[:10]):
    print(f"{i+1}. {p} -> {lbl}")

# Persist
out_json = 'dataset1_labels.json'
with open(out_json, 'w', encoding='utf-8') as f:
    json.dump(single_object_map, f, ensure_ascii=False, indent=2)
print(f"Saved dictionary to {out_json}")

## EDA: image dimension distributions (Dataset 1)
Sample images and visualize width/height distributions to inform cropping/resizing strategy for dataset 1 images.

In [None]:
#again find and plot the distribution of image dimensions in the single_object_map
import random
from PIL import Image
with open('dataset1_labels.json', 'r', encoding='utf-8') as f:
    single_object_map = json.load(f)

sample_paths = random.sample(list(single_object_map.keys()), 3000)
dimensions = []
for path in sample_paths:
    with Image.open(path) as img:
        dimensions.append(img.size)  # (width, height)
widths, heights = zip(*dimensions)
plt.figure(figsize=(12, 5))
plt.subplot(1, 2, 1)
plt.hist(widths, bins=20, color='blue', alpha=0.7)
plt.title('Image Width Distribution (Dataset 1)')
plt.xlabel('Width (pixels)')
plt.ylabel('Frequency')
plt.subplot(1, 2, 2)
plt.hist(heights, bins=20, color='green', alpha=0.7)
plt.title('Image Height Distribution (Dataset 1)')
plt.xlabel('Height (pixels)')
plt.ylabel('Frequency')
plt.tight_layout()
plt.show()

## Crop 256x256 regions around labeled bounding boxes (Dataset 1)
For each single-object image, read its YOLO label, compute the sign center in pixel coordinates, and crop a 256x256 patch centered on the sign. Save crops to `dataset1_cropped`.

In [None]:
# Crop 256x256 regions around traffic signs from 416x416 images in dataset1
# Save cropped images to a new directory with the same filenames

from pathlib import Path
from PIL import Image
import json

# Load the dataset1 labels
with open('dataset1_labels.json', 'r', encoding='utf-8') as f:
    dataset1_labels = json.load(f)

dataset1_root = Path(r"dataset1")
output_dir = Path(r"dataset1_cropped")
output_dir.mkdir(parents=True, exist_ok=True)

# Label directories to search for corresponding txt files
label_dirs = [
    dataset1_root / "train" / "labels",
    dataset1_root / "test" / "labels"
]

def find_label_file(image_path: Path) -> Path | None:
    """Find the corresponding YOLO label txt file for an image."""
    stem = image_path.stem
    for label_dir in label_dirs:
        label_file = label_dir / f"{stem}.txt"
        if label_file.exists():
            return label_file
    return None

def get_bbox_from_label(label_file: Path) -> tuple | None:
    """Parse YOLO label file and return (x_center, y_center, width, height) normalized."""
    lines = [ln.strip() for ln in label_file.read_text(encoding='utf-8').splitlines() if ln.strip()]
    if len(lines) != 1:
        return None
    parts = lines[0].split()
    if len(parts) < 5:
        return None
    try:
        x_center = float(parts[1])
        y_center = float(parts[2])
        width = float(parts[3])
        height = float(parts[4])
        return (x_center, y_center, width, height)
    except ValueError:
        return None

def crop_around_bbox(img: Image.Image, bbox: tuple, crop_size: int = 256) -> Image.Image:
    """
    Crop a crop_size x crop_size region centered on the bounding box.
    If the crop would go outside image bounds, shift it to stay within bounds.
    """
    img_width, img_height = img.size
    x_center_norm, y_center_norm, _, _ = bbox
    
    # Convert normalized bbox center to pixel coordinates
    x_center_px = int(x_center_norm * img_width)
    y_center_px = int(y_center_norm * img_height)
    
    # Calculate crop boundaries centered on bbox
    half_crop = crop_size // 2
    left = x_center_px - half_crop
    top = y_center_px - half_crop
    right = left + crop_size
    bottom = top + crop_size
    
    # Shift crop if it goes outside image bounds
    if left < 0:
        left = 0
        right = crop_size
    if top < 0:
        top = 0
        bottom = crop_size
    if right > img_width:
        right = img_width
        left = img_width - crop_size
    if bottom > img_height:
        bottom = img_height
        top = img_height - crop_size
    
    # Ensure bounds are valid (for images smaller than crop_size)
    left = max(0, left)
    top = max(0, top)
    right = min(img_width, right)
    bottom = min(img_height, bottom)
    
    return img.crop((left, top, right, bottom))

# Process all images in dataset1_labels
success_count = 0
skip_count = 0
error_count = 0

for img_path_str, label in dataset1_labels.items():
    img_path = Path(img_path_str)
    
    if not img_path.exists():
        error_count += 1
        continue
    
    # Find corresponding label file
    label_file = find_label_file(img_path)
    if label_file is None:
        skip_count += 1
        continue
    
    # Get bounding box
    bbox = get_bbox_from_label(label_file)
    if bbox is None:
        skip_count += 1
        continue
    
    try:
        with Image.open(img_path) as img:
            # Only process 416x416 images (or close to it)
            if img.size[0] < 256 or img.size[1] < 256:
                skip_count += 1
                continue
            
            # Crop around the traffic sign
            cropped = crop_around_bbox(img, bbox, crop_size=256)
            
            # Save with same filename to output directory
            output_path = output_dir / img_path.name
            cropped.save(output_path)
            success_count += 1
            
    except Exception as e:
        error_count += 1
        print(f"Error processing {img_path.name}: {e}")

print(f"Successfully cropped: {success_count}")
print(f"Skipped: {skip_count}")
print(f"Errors: {error_count}")
print(f"Output directory: {output_dir}")

## Update dataset1 label paths to point to cropped images
Replace original image paths in `dataset1_labels.json` with the paths to the newly created cropped images. Warnings are printed for any missing cropped images.

In [None]:
# Update dataset1_labels.json to point to the cropped images instead of the originals

import json
from pathlib import Path

# Load current labels
with open('dataset1_labels.json', 'r', encoding='utf-8') as f:
    dataset1_labels = json.load(f)

cropped_dir = Path(r"dataset1_cropped")

# Create new dictionary with updated paths
updated_labels = {}
for old_path, label in dataset1_labels.items():
    old_path_obj = Path(old_path)
    new_path = cropped_dir / old_path_obj.name
    if new_path.exists():
        updated_labels[str(new_path.resolve())] = label
    else:
        print(f"Warning: cropped image not found: {new_path.name}")

# Save updated labels
with open('dataset1_labels.json', 'w', encoding='utf-8') as f:
    json.dump(updated_labels, f, ensure_ascii=False, indent=2)

print(f"Updated {len(updated_labels)} paths to point to cropped images")
print(f"Sample: {list(updated_labels.keys())[0]}")

## Verify cropped image dimensions (Dataset 1)
Plot the distribution of the cropped images to ensure crop_size = 256 was applied as intended.

In [None]:
#plot the distribution of image dimensions in the cropped dataset1_labels.json

import random
from PIL import Image
with open('dataset1_labels.json', 'r', encoding='utf-8') as f:
    dataset1_labels = json.load(f)  
sample_paths = random.sample(list(dataset1_labels.keys()), 3000)
dimensions = [] 
for path in sample_paths:
    with Image.open(path) as img:
        dimensions.append(img.size)  # (width, height)
widths, heights = zip(*dimensions)
plt.figure(figsize=(12, 5))
plt.subplot(1, 2, 1)
plt.hist(widths, bins=20, color='blue', alpha=0.7)
plt.title('Cropped Image Width Distribution (Dataset 1)')
plt.xlabel('Width (pixels)')
plt.ylabel('Frequency')
plt.subplot(1, 2, 2)
plt.hist(heights, bins=20, color='green', alpha=0.7)
plt.title('Cropped Image Height Distribution (Dataset 1)')
plt.xlabel('Height (pixels)')
plt.ylabel('Frequency')
plt.tight_layout()
plt.show()

# Merge the two datasets

* A basic problem is the mismatch in how each dataset labels the classes.

In [None]:
#print a list of the distinct class labels in dataset1_labels.json
with open('dataset1_labels.json', 'r', encoding='utf-8') as f:
    dataset1_labels = json.load(f)
distinct_labels = set(dataset1_labels.values())
print(f"Distinct class labels in dataset1_labels.json ({len(distinct_labels)} total):")
for lbl in sorted(distinct_labels):
    print(f"- {lbl}")

In [None]:
#print a list of the unique labels in both dataset2_labels.json
with open('dataset2_labels.json', 'r', encoding='utf-8') as f:
    dataset2_labels = json.load(f)
distinct_labels2 = set(dataset2_labels.values())
print(f"Distinct class labels in dataset2_labels.json ({len(distinct_labels2)} total):")
for lbl in sorted(distinct_labels2):
    print(f"- {lbl}")

* Basically the only difference is on the speed limit signs.

In [None]:
# Map different spellings/wordings to a single canonical label
# Only include things that should be merged across datasets.
synonym_map_lower = {
    # Speed limits that appear in both datasets
    "speed limit (30km/h)": "Speed Limit 30",
    "speed limit (40km/h)": "Speed Limit 40",
    "speed limit (50km/h)": "Speed Limit 50",
    "speed limit (60km/h)": "Speed Limit 60",
    "speed limit (70km/h)": "Speed Limit 70",
    "speed limit (80km/h)": "Speed Limit 80",

    # If you ever had "speed limit (15km/h)" <-> "Speed Limit 15"
    # you could add:
    # "speed limit (15km/h)": "Speed Limit 15",
}

def unify_label(label: str) -> str:
    """
    Return the unified/canonical version of a label.
    - If it's a known alias, map to the canonical form.
    - Otherwise leave it exactly as-is.
    """
    key = label.strip().lower()
    return synonym_map_lower.get(key, label)

# Build new dict with ALL keys from both datasets,
# values replaced with unified labels.
merged_labels = {}

# First add all dataset1 labels
for k, v in dataset1_labels.items():
    merged_labels[k] = unify_label(v)

# Then add all dataset2 labels
for k, v in dataset2_labels.items():
    merged_labels[k] = unify_label(v)

# Save to a new JSON file
with open('merged_labels.json', 'w', encoding='utf-8') as f:
    json.dump(merged_labels, f, ensure_ascii=False, indent=2)

print("Merged labels written to merged_labels.json")

In [None]:
# Replace absolute paths in merged_labels.json with relative paths

infile = 'merged_labels.json'
if not Path(infile).exists():
    print(f"{infile} not found — nothing to update.")
else:
    with open(infile, 'r', encoding='utf-8') as f:
        data = json.load(f)

    updated = {}
    for key, val in data.items():
        try:
            p = Path(key)
            if p.is_absolute():
                rel = os.path.relpath(str(p), start=os.getcwd())
                # normalize separators to forward-slash for consistency
                rel = rel.replace('\\', '/')
                updated[rel] = val
            else:
                updated[key] = val
        except Exception:
            # If any unexpected path, keep original key
            updated[key] = val

    with open(infile, 'w', encoding='utf-8') as f:
        json.dump(updated, f, ensure_ascii=False, indent=2)

    print(f"Updated {len(updated)} entries in {infile} — absolute paths converted where possible.")
    # show a small sample
    for i, k in enumerate(list(updated.keys())[:10]):
        print(f"{i+1}. {k}")