In [1]:
"""
Complete EDA and Dataset Splitting Script
Analyzes unified dataset and creates stratified (or random) train/val/test splits
Handles single-class datasets gracefully (like 'corrosion' only)

Run in Jupyter or as standalone script:
    python notebooks/01_eda_and_split.py
"""

import json
import cv2
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from pathlib import Path
from collections import Counter, defaultdict
from sklearn.model_selection import train_test_split
import shutil
from tqdm import tqdm

class DatasetAnalyzer:
    def __init__(self, coco_json_path):
        self.coco_path = Path(coco_json_path)
        with open(coco_json_path, 'r') as f:
            self.coco_data = json.load(f)
        
        self.images = self.coco_data.get('images', [])
        self.annotations = self.coco_data.get('annotations', [])
        self.categories = {c['id']: c['name'] for c in self.coco_data.get('categories', [])}
        
        # Create lookups
        self.img_to_anns = defaultdict(list)
        for ann in self.annotations:
            self.img_to_anns[ann['image_id']].append(ann)
    
    def basic_stats(self):
        print("=" * 60)
        print("DATASET STATISTICS")
        print("=" * 60)
        print(f"Total Images: {len(self.images)}")
        print(f"Total Annotations: {len(self.annotations)}")
        print(f"Categories: {len(self.categories)}")
        
        cat_counts = Counter([ann['category_id'] for ann in self.annotations])
        if not cat_counts:
            print("\n‚ö† No annotations found!")
            return
        
        print(f"\nCategory Distribution:")
        for cat_id, count in cat_counts.most_common():
            cat_name = self.categories.get(cat_id, 'unknown')
            percentage = (count / len(self.annotations)) * 100
            print(f"  {cat_name}: {count} ({percentage:.1f}%)")
        
        print("=" * 60)
    
    def analyze_images(self):
        print("\nANALYZING IMAGE PROPERTIES...")
        widths, heights, aspect_ratios = [], [], []
        file_sizes = []
        
        for img in tqdm(self.images):
            widths.append(img.get('width', 0))
            heights.append(img.get('height', 0))
            if img.get('height', 0) > 0:
                aspect_ratios.append(img.get('width', 1) / img.get('height', 1))
            
            if 'path' in img and Path(img['path']).exists():
                file_sizes.append(Path(img['path']).stat().st_size / 1024)  # KB
        
        stats = {
            'Width': {'min': np.min(widths), 'max': np.max(widths), 'mean': np.mean(widths), 'std': np.std(widths)},
            'Height': {'min': np.min(heights), 'max': np.max(heights), 'mean': np.mean(heights), 'std': np.std(heights)},
            'Aspect Ratio': {'min': np.min(aspect_ratios), 'max': np.max(aspect_ratios), 'mean': np.mean(aspect_ratios), 'std': np.std(aspect_ratios)}
        }
        
        print("\nImage Dimension Stats:")
        for metric, values in stats.items():
            print(f"  {metric}: mean={values['mean']:.2f}, std={values['std']:.2f}, min={values['min']:.2f}, max={values['max']:.2f}")
        
        if file_sizes:
            print(f"\nFile Sizes: Mean={np.mean(file_sizes):.2f} KB, Range=[{np.min(file_sizes):.2f}, {np.max(file_sizes):.2f}]")
        
        return stats, widths, heights, aspect_ratios
    
    def analyze_annotations(self):
        print("\nANALYZING ANNOTATIONS...")
        if not self.annotations:
            print("‚ö† No annotations found ‚Äî skipping.")
            return [], []
        
        bbox_areas, anns_per_image = [], []
        
        for img in self.images:
            anns = self.img_to_anns[img['id']]
            anns_per_image.append(len(anns))
            for ann in anns:
                w, h = ann['bbox'][2], ann['bbox'][3]
                bbox_areas.append(w * h)
        
        print(f"  Mean bbox area: {np.mean(bbox_areas):.2f}")
        print(f"  Max per-image annotations: {max(anns_per_image)}")
        return bbox_areas, anns_per_image
    
    def visualize_samples(self, num_samples=12, save_dir="results/figures"):
        print("\nGENERATING SAMPLE VISUALIZATIONS...")
        save_dir = Path(save_dir)
        save_dir.mkdir(parents=True, exist_ok=True)
        
        if len(self.images) == 0:
            print("‚ö† No images to visualize.")
            return
        
        sample_imgs = np.random.choice(self.images, min(num_samples, len(self.images)), replace=False)
        fig, axes = plt.subplots(3, 4, figsize=(20, 15))
        axes = axes.flatten()
        
        for idx, img_info in enumerate(sample_imgs):
            if 'path' not in img_info or not Path(img_info['path']).exists():
                continue
            
            img = cv2.imread(img_info['path'])
            img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
            anns = self.img_to_anns[img_info['id']]
            
            for ann in anns:
                x, y, w, h = map(int, ann['bbox'])
                color = (255, 0, 0)  # red box for corrosion
                cv2.rectangle(img, (x, y), (x + w, y + h), color, 2)
                cat_name = self.categories.get(ann['category_id'], 'corrosion')
                cv2.putText(img, cat_name, (x, y - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)
            
            axes[idx].imshow(img)
            axes[idx].set_title(f"{Path(img_info['file_name']).stem}\n{len(anns)} annotations")
            axes[idx].axis('off')
        
        plt.tight_layout()
        plt.savefig(save_dir / "sample_annotations.png", dpi=150, bbox_inches='tight')
        print(f"‚úì Saved to: {save_dir / 'sample_annotations.png'}")
        plt.close()
    
    def plot_distributions(self, widths, heights, bbox_areas, save_dir="results/figures"):
        print("\nGENERATING DISTRIBUTION PLOTS...")
        save_dir = Path(save_dir)
        save_dir.mkdir(parents=True, exist_ok=True)
        
        fig, axes = plt.subplots(2, 3, figsize=(18, 10))
        
        axes[0, 0].hist(widths, bins=30, edgecolor='black', alpha=0.7)
        axes[0, 0].set_title('Image Widths')
        
        axes[0, 1].hist(heights, bins=30, edgecolor='black', alpha=0.7)
        axes[0, 1].set_title('Image Heights')
        
        aspect_ratios = [w / h for w, h in zip(widths, heights) if h > 0]
        axes[0, 2].hist(aspect_ratios, bins=30, edgecolor='black', alpha=0.7)
        axes[0, 2].set_title('Aspect Ratios')
        
        if bbox_areas:
            axes[1, 0].hist(bbox_areas, bins=50, edgecolor='black', alpha=0.7)
            axes[1, 0].set_title('Bounding Box Areas')
            axes[1, 0].set_yscale('log')
        
        cat_counts = Counter([ann['category_id'] for ann in self.annotations])
        if cat_counts:
            axes[1, 1].bar([self.categories[c] for c in cat_counts.keys()], list(cat_counts.values()), edgecolor='black', alpha=0.7)
            axes[1, 1].set_title('Category Distribution')
        else:
            axes[1, 1].text(0.5, 0.5, 'No category data', ha='center', va='center')
        
        anns_per_img = [len(self.img_to_anns[img['id']]) for img in self.images]
        axes[1, 2].hist(anns_per_img, bins=range(max(anns_per_img)+2), edgecolor='black', alpha=0.7)
        axes[1, 2].set_title('Annotations per Image')
        
        plt.tight_layout()
        plt.savefig(save_dir / "data_distributions.png", dpi=150, bbox_inches='tight')
        print(f"‚úì Saved to: {save_dir / 'data_distributions.png'}")
        plt.close()

class DatasetSplitter:
    def __init__(self, coco_json_path, output_dir="data/processed"):
        self.coco_path = Path(coco_json_path)
        self.output_dir = Path(output_dir)
        self.output_dir.mkdir(parents=True, exist_ok=True)
        
        with open(coco_json_path, 'r') as f:
            self.coco_data = json.load(f)
    
    def stratified_split(self, train_ratio=0.70, val_ratio=0.15, test_ratio=0.15, random_state=42):
        print("\n" + "=" * 60)
        print("CREATING DATA SPLITS")
        print("=" * 60)
        
        images = self.coco_data['images']
        annotations = self.coco_data['annotations']
        img_to_cats = defaultdict(list)
        for ann in annotations:
            img_to_cats[ann['image_id']].append(ann['category_id'])
        
        image_labels = []
        for img in images:
            cats = img_to_cats[img['id']]
            primary_cat = Counter(cats).most_common(1)[0][0] if cats else 0
            image_labels.append(primary_cat)
        
        unique_labels = list(set(image_labels))
        if len(unique_labels) < 2:
            print("‚ö† Only one class detected ‚Äî using random split instead.")
            train_val_imgs, test_imgs = train_test_split(images, test_size=test_ratio, random_state=random_state)
            val_ratio_adj = val_ratio / (train_ratio + val_ratio)
            train_imgs, val_imgs = train_test_split(train_val_imgs, test_size=val_ratio_adj, random_state=random_state)
        else:
            train_val_imgs, test_imgs, train_val_labels, test_labels = train_test_split(
                images, image_labels, test_size=test_ratio, stratify=image_labels, random_state=random_state)
            val_ratio_adj = val_ratio / (train_ratio + val_ratio)
            train_imgs, val_imgs, train_labels, val_labels = train_test_split(
                train_val_imgs, train_val_labels, test_size=val_ratio_adj, stratify=train_val_labels, random_state=random_state)
        
        splits = {'train': train_imgs, 'val': val_imgs, 'test': test_imgs}
        print(f"  Train: {len(train_imgs)}, Val: {len(val_imgs)}, Test: {len(test_imgs)}")
        return splits
    
    def save_splits(self, splits):
        for split_name, split_imgs in splits.items():
            split_img_ids = {img['id'] for img in split_imgs}
            split_anns = [ann for ann in self.coco_data['annotations'] if ann['image_id'] in split_img_ids]
            split_data = {
                'info': self.coco_data['info'],
                'images': split_imgs,
                'annotations': split_anns,
                'categories': self.coco_data['categories']
            }
            output_path = self.output_dir / f"{split_name}_annotations.json"
            with open(output_path, 'w') as f:
                json.dump(split_data, f, indent=2)
            print(f"‚úì Saved {split_name} annotations to {output_path}")

def main():
    print("\n" + "=" * 70)
    print(" " * 15 + "DAY 1: EDA & DATASET SPLITTING")
    print("=" * 70)
    
    coco_json = "data/processed/unified/unified_annotations.json"
    if not Path(coco_json).exists():
        print(f"‚úó Missing: {coco_json}")
        return
    
    analyzer = DatasetAnalyzer(coco_json)
    analyzer.basic_stats()
    stats, widths, heights, aspect_ratios = analyzer.analyze_images()
    bbox_areas, anns_per_img = analyzer.analyze_annotations()
    analyzer.visualize_samples(num_samples=12)
    analyzer.plot_distributions(widths, heights, bbox_areas)
    
    splitter = DatasetSplitter(coco_json)
    splits = splitter.stratified_split()
    splitter.save_splits(splits)
    
    print("\n‚úì DAY 1 COMPLETE ‚Äì Visuals in results/figures, splits in data/processed")

if __name__ == "__main__":
    main()



               DAY 1: EDA & DATASET SPLITTING
DATASET STATISTICS
Total Images: 561
Total Annotations: 3056
Categories: 1

Category Distribution:
  corrosion: 3056 (100.0%)

ANALYZING IMAGE PROPERTIES...


100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 561/561 [00:00<00:00, 2021.34it/s]



Image Dimension Stats:
  Width: mean=886.96, std=747.15, min=640.00, max=4032.00
  Height: mean=969.61, std=993.97, min=640.00, max=4032.00
  Aspect Ratio: mean=0.98, std=0.08, min=0.75, max=1.33

File Sizes: Mean=205.93 KB, Range=[19.62, 2437.70]

ANALYZING ANNOTATIONS...
  Mean bbox area: 112063.90
  Max per-image annotations: 310

GENERATING SAMPLE VISUALIZATIONS...
‚úì Saved to: results\figures\sample_annotations.png

GENERATING DISTRIBUTION PLOTS...
‚úì Saved to: results\figures\data_distributions.png

CREATING DATA SPLITS
  Train: 392, Val: 84, Test: 85
‚úì Saved train annotations to data\processed\train_annotations.json
‚úì Saved val annotations to data\processed\val_annotations.json
‚úì Saved test annotations to data\processed\test_annotations.json

‚úì DAY 1 COMPLETE ‚Äì Visuals in results/figures, splits in data/processed


In [1]:
"""
01_eda_and_split.py
Exploratory Data Analysis + Train/Val/Test Split
for the unified corrosion dataset
"""

import json
import random
from pathlib import Path
from collections import defaultdict
import matplotlib.pyplot as plt
import cv2
import os
import shutil
from sklearn.model_selection import train_test_split

# =============================================================
# PATHS
# =============================================================
try:
    ROOT = Path(__file__).resolve().parent
except NameError:
    # Fallback for notebooks
    ROOT = Path(os.getcwd())

DATA_DIR = ROOT / "data"
UNIFIED_JSON = DATA_DIR / "processed" / "unified" / "unified_annotations.json"
RESULTS_DIR = ROOT / "results" / "figures"
RESULTS_DIR.mkdir(parents=True, exist_ok=True)



# =============================================================
# LOAD COCO JSON
# =============================================================
print(f"üìÇ Loading unified dataset: {UNIFIED_JSON}")
with open(UNIFIED_JSON, "r") as f:
    coco_data = json.load(f)

images = coco_data["images"]
annotations = coco_data["annotations"]
categories = coco_data["categories"]
print(f"‚úÖ Loaded {len(images)} images, {len(annotations)} annotations, {len(categories)} categories.")

# =============================================================
# EDA: CATEGORY COUNTS
# =============================================================
cat_counts = defaultdict(int)
for ann in annotations:
    cat_counts[ann["category_id"]] += 1

cat_names = {c["id"]: c["name"] for c in categories}

plt.figure(figsize=(6, 4))
plt.bar([cat_names[k] for k in cat_counts.keys()], cat_counts.values(), color="steelblue")
plt.title("Annotation Counts per Category")
plt.ylabel("Count")
plt.tight_layout()
plt.savefig(RESULTS_DIR / "data_distributions.png", dpi=300)
plt.close()

print(f"üìä Saved category distribution plot ‚Üí {RESULTS_DIR / 'data_distributions.png'}")

# =============================================================
# VISUALIZE RANDOM SAMPLES
# =============================================================
def show_random_samples(coco_json_path, num_samples=6):
    with open(coco_json_path, 'r') as f:
        coco_data = json.load(f)

    images = coco_data['images']
    annotations = coco_data['annotations']
    categories = {c['id']: c['name'] for c in coco_data['categories']}

    img_to_anns = defaultdict(list)
    for ann in annotations:
        img_to_anns[ann['image_id']].append(ann)

    samples = random.sample(images, min(num_samples, len(images)))

    plt.figure(figsize=(16, 10))
    for idx, img_info in enumerate(samples):
        img_path = Path(img_info.get("path", "")) or Path(img_info["file_name"])
        if not img_path.exists():
            continue

        img = cv2.imread(str(img_path))
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        anns = img_to_anns[img_info["id"]]

        for ann in anns:
            x, y, w, h = map(int, ann["bbox"])
            color = (255, 0, 0)
            cv2.rectangle(img, (x, y), (x + w, y + h), color, 2)
            cat_name = categories.get(ann["category_id"], "corrosion")
            cv2.putText(img, cat_name, (x, y - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)

        plt.subplot(2, (num_samples + 1)//2, idx + 1)
        plt.imshow(img)
        plt.axis("off")
        plt.title(f"{Path(img_info['file_name']).stem} ({len(anns)} boxes)")

    plt.tight_layout()
    plt.savefig(RESULTS_DIR / "sample_annotations.png", dpi=300)
    plt.show()

show_random_samples(UNIFIED_JSON, num_samples=6)
print(f"üñºÔ∏è Saved sample visualization ‚Üí {RESULTS_DIR / 'sample_annotations.png'}")

# =============================================================
# TRAIN/VAL/TEST SPLIT
# =============================================================
train_imgs, test_imgs = train_test_split(images, test_size=0.2, random_state=42)
val_imgs, test_imgs = train_test_split(test_imgs, test_size=0.5, random_state=42)

splits = {
    "train": train_imgs,
    "val": val_imgs,
    "test": test_imgs
}

for split_name, split_imgs in splits.items():
    img_ids = {img["id"] for img in split_imgs}
    split_anns = [a for a in annotations if a["image_id"] in img_ids]
    out_data = {
        "images": split_imgs,
        "annotations": split_anns,
        "categories": categories
    }
    out_path = DATA_DIR / "processed" / f"{split_name}_annotations.json"
    with open(out_path, "w") as f:
        json.dump(out_data, f, indent=2)
    print(f"üíæ Saved {split_name} set ‚Üí {out_path} ({len(split_imgs)} imgs, {len(split_anns)} anns)")

print("‚úÖ EDA and dataset split complete.")


üìÇ Loading unified dataset: C:\Users\Blue\corrosion_detection\src\data\data\processed\unified\unified_annotations.json


FileNotFoundError: [Errno 2] No such file or directory: 'C:\\Users\\Blue\\corrosion_detection\\src\\data\\data\\processed\\unified\\unified_annotations.json'

In [5]:
# ============================================
# CLEAN PROJECT STRUCTURE VIEWER
# ============================================
from pathlib import Path

def print_folder_structure(base_dir=None, max_depth=3, skip_heavy_dirs=True):
    """
    Prints the folder structure of the corrosion_detection project.
    Automatically detects project root.
    Skips deep image folders (train/, test/, valid/, etc.) for clarity.
    """
    try:
        current = Path(__file__).resolve()
    except NameError:
        current = Path.cwd().resolve()

    # Detect top-level project folder
    while current.name.lower() != "corrosion_detection" and current.parent != current:
        current = current.parent

    base_path = current if base_dir is None else Path(base_dir).resolve()
    print(f"\nüìÇ Folder structure for: {base_path}")
    print("=" * 60)

    def recurse(path, depth=0):
        if max_depth is not None and depth > max_depth:
            return
        indent = "‚îÇ   " * depth
        for item in sorted(path.iterdir()):
            if item.name.startswith("."):
                continue
            if item.is_dir():
                # Skip heavy folders to avoid visual clutter
                if skip_heavy_dirs and item.name.lower() in {
                    "train", "test", "valid", "images", "__pycache__"
                }:
                    print(f"{indent}üìÅ {item.name}/ ... (skipped)")
                    continue

                print(f"{indent}üìÅ {item.name}/")
                recurse(item, depth + 1)
            else:
                # Only show small number of representative files
                if depth <= 1 and item.suffix in {".py", ".ipynb", ".json", ".txt"}:
                    print(f"{indent}üìÑ {item.name}")

    recurse(base_path)
    print("=" * 60)


# Example usage
print_folder_structure(max_depth=3)



üìÇ Folder structure for: C:\Users\Blue\corrosion_detection
üìÅ config/
üìÅ data/
‚îÇ   üìÅ organized/
‚îÇ   ‚îÇ   üìÅ bmvc_corrosion/
‚îÇ   ‚îÇ   ‚îÇ   üìÅ annotations/
‚îÇ   ‚îÇ   ‚îÇ   üìÅ images/ ... (skipped)
‚îÇ   ‚îÇ   üìÅ github_datasets/
‚îÇ   ‚îÇ   ‚îÇ   üìÅ annotations/
‚îÇ   ‚îÇ   ‚îÇ   üìÅ images/ ... (skipped)
‚îÇ   ‚îÇ   üìÅ kaggle_pipeline/
‚îÇ   ‚îÇ   ‚îÇ   üìÅ annotations/
‚îÇ   ‚îÇ   ‚îÇ   üìÅ images/ ... (skipped)
‚îÇ   ‚îÇ   üìÅ roboflow_corrosao/
‚îÇ   ‚îÇ   ‚îÇ   üìÅ annotations/
‚îÇ   ‚îÇ   ‚îÇ   üìÅ images/ ... (skipped)
‚îÇ   ‚îÇ   üìÅ roboflow_inpipe/
‚îÇ   ‚îÇ   ‚îÇ   üìÅ annotations/
‚îÇ   ‚îÇ   ‚îÇ   üìÅ images/ ... (skipped)
‚îÇ   ‚îÇ   üìÅ test_samples/
‚îÇ   ‚îÇ   ‚îÇ   üìÅ annotations/
‚îÇ   ‚îÇ   ‚îÇ   üìÅ images/ ... (skipped)
‚îÇ   üìÅ processed/
‚îÇ   ‚îÇ   üìÅ unified/
‚îÇ   üìÅ raw/
‚îÇ   ‚îÇ   üìÅ bmvc_corrosion/
‚îÇ   ‚îÇ   ‚îÇ   üìÅ cross_val_1/
‚îÇ   ‚îÇ   ‚îÇ   üìÅ cross_val_10/
‚îÇ   ‚îÇ   ‚îÇ   üìÅ cross_val_2