In [1]:
!pip install ultralytics

Collecting ultralytics
  Downloading ultralytics-8.3.191-py3-none-any.whl.metadata (37 kB)
Collecting ultralytics-thop>=2.0.0 (from ultralytics)
  Downloading ultralytics_thop-2.0.16-py3-none-any.whl.metadata (14 kB)
Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch>=1.8.0->ultralytics)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch>=1.8.0->ultralytics)
  Downloading nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.4.127 (from torch>=1.8.0->ultralytics)
  Downloading nvidia_cuda_cupti_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cudnn-cu12==9.1.0.70 (from torch>=1.8.0->ultralytics)
  Downloading nvidia_cudnn_cu12-9.1.0.70-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cublas-cu12==12.4.5.8 (from torch>=1.8.0->ultralytics)
  Downloading nv

In [2]:
import os
import cv2
import yaml
import pickle
import shutil
import random
import numpy as np
import torch
import matplotlib.pyplot as plt
from pathlib import Path
from ultralytics import YOLO

def set_seed(seed: int = 42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
    print(f"Random seed fixed at {seed}")



Creating new Ultralytics Settings v0.0.6 file ✅ 
View Ultralytics Settings with 'yolo settings' or at '/root/.config/Ultralytics/settings.json'
Update Settings with 'yolo settings key=value', i.e. 'yolo settings runs_dir=path/to/dir'. For help see https://docs.ultralytics.com/quickstart/#ultralytics-settings.


# ID Bound

## Loading Data

In [3]:
import os
import cv2
import yaml
import pickle
import shutil
import numpy as np
import matplotlib.pyplot as plt
from pathlib import Path
from ultralytics import YOLO


def load_label_file(label_path: Path):
    if not label_path.exists():
        return []
    with open(label_path, "r") as f:
        lines = f.readlines()

    annotations = []
    for line in lines:
        parts = line.strip().split()
        if len(parts) >= 5:
            class_id = int(parts[0])
            if len(parts) == 5:
                x_center, y_center, width, height = map(float, parts[1:5])
                annotations.append([class_id, x_center, y_center, width, height])
            else:  
                polygon = [float(x) for x in parts[1:]]
                x_coords = polygon[0::2]
                y_coords = polygon[1::2]
                x_min, x_max = min(x_coords), max(x_coords)
                y_min, y_max = min(y_coords), max(y_coords)
                x_center = (x_min + x_max) / 2
                y_center = (y_min + y_max) / 2
                width = x_max - x_min
                height = y_max - y_min
                annotations.append([class_id, x_center, y_center, width, height])
    return annotations


def filter_label_0_data(base_path, split="train"):
    base_path = Path(base_path)
    images_dir = base_path / split / "images"
    labels_dir = base_path / split / "labels"

    image_files = list(images_dir.glob("*.jpg")) + list(images_dir.glob("*.png"))
    filtered_data = []

    for image_path in image_files:
        annotations = load_label_file(labels_dir / f"{image_path.stem}.txt")
        class_0_annotations = [ann for ann in annotations if ann[0] == 0]
        if class_0_annotations:
            filtered_data.append({"image_path": image_path, "annotations": class_0_annotations})

    print(f"[{split}] {len(filtered_data)} images with label 0")
    return filtered_data


def visualize_samples(filtered_data, num_samples=6):
    if not filtered_data:
        print("No data to visualize.")
        return

    num_samples = min(num_samples, len(filtered_data))
    cols, rows = 3, (num_samples + 2) // 3
    plt.figure(figsize=(15, 5 * rows))

    for i in range(num_samples):
        data_item = filtered_data[i]
        image = cv2.imread(str(data_item["image_path"]))
        if image is None:
            continue
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        h, w = image.shape[:2]

        for class_id, x_c, y_c, bw, bh in data_item["annotations"]:
            x1 = int((x_c - bw / 2) * w)
            y1 = int((y_c - bh / 2) * h)
            x2 = int((x_c + bw / 2) * w)
            y2 = int((y_c + bh / 2) * h)
            cv2.rectangle(image, (x1, y1), (x2, y2), (255, 0, 0), 2)

        plt.subplot(rows, cols, i + 1)
        plt.imshow(image)
        plt.title(data_item["image_path"].name)
        plt.axis("off")

    plt.tight_layout()
    plt.show()


# def analyze_dataset_format(base_path, split="train"):
#     base_path = Path(base_path)
#     labels_dir = base_path / split / "labels"
#     label_files = list(labels_dir.glob("*.txt"))

#     bbox, seg, files_bbox, files_seg, files_mixed = 0, 0, 0, 0, 0
#     for lf in label_files:
#         anns = load_label_file(lf)
#         bbox_count = sum(1 for ann in anns if len(ann) == 5)
#         seg_count = len(anns) - bbox_count
#         bbox += bbox_count
#         seg += seg_count
#         if bbox_count > 0 and seg_count > 0:
#             files_mixed += 1
#         elif bbox_count > 0:
#             files_bbox += 1
#         elif seg_count > 0:
#             files_seg += 1

#     print(f"\n=== {split} set ===")
#     print(f"Files: {len(label_files)} | BBox: {bbox} | Seg: {seg}")
#     print(f"Files (bbox only): {files_bbox}, (seg only): {files_seg}, mixed: {files_mixed}")




## Train and Hyperparameter Tuning

In [None]:


DATASET_CONFIG = {
    'path': '/kaggle/input/id-bound',
    'train': 'train/images',
    'val': 'valid/images',
    'test': 'test/images',
    'nc': 1,
    'names': ['ID']
}

HYPERPARAMS = [
    {'lr0': 0.002, 'weight_decay': 0.0005, 'imgsz': 640, 'batch': 16, 'epochs': 35,
     'degrees': 2, 'translate': 0.05, 'scale': 0.1, 'mosaic': 0.5, 'hsv_h': 0.015, 'hsv_s': 0.7, 'hsv_v': 0.4},
    {'lr0': 0.001, 'weight_decay': 0.0005, 'imgsz': 640, 'batch': 16, 'epochs': 35,
     'degrees': 2, 'translate': 0.05, 'scale': 0.1, 'mosaic': 0.5, 'hsv_h': 0.015, 'hsv_s': 0.7, 'hsv_v': 0.4},
    {'lr0': 0.002, 'weight_decay': 0.0005, 'imgsz': 800, 'batch': 8, 'epochs': 35,
     'degrees': 2, 'translate': 0.05, 'scale': 0.1, 'mosaic': 0.5, 'hsv_h': 0.015, 'hsv_s': 0.7, 'hsv_v': 0.4},
    {'lr0': 0.002, 'weight_decay': 0.0005, 'imgsz': 640, 'batch': 16, 'epochs': 35,
     'degrees': 5, 'translate': 0.1, 'scale': 0.2, 'mosaic': 0.8, 'hsv_h': 0.03, 'hsv_s': 0.5, 'hsv_v': 0.3},
    {'lr0': 0.002, 'weight_decay': 0.0005, 'imgsz': 640, 'batch': 16, 'epochs': 35,
     'degrees': 0, 'translate': 0.02, 'scale': 0.05, 'mosaic': 0.2, 'hsv_h': 0.01, 'hsv_s': 0.3, 'hsv_v': 0.2}
]

CHECKPOINT_FILE = '/kaggle/working/tuning_checkpoint.pkl'
DATASET_YAML = 'dataset.yaml'
PROJECT_NAME = 'hyperparam_tuning'


def save_dataset_yaml(config: dict, filename: str = DATASET_YAML):
    with open(filename, 'w') as f:
        yaml.dump(config, f)
    print(f"Dataset configuration saved to {filename}")

def load_checkpoint(filepath: str):
    if os.path.exists(filepath):
        try:
            with open(filepath, 'rb') as f:
                data = pickle.load(f)
            print(f"Resuming from run {data['last_completed_run'] + 1}")
            return data
        except:
            print("Checkpoint exists but not readable, starting fresh...")
    else:
        print("Starting fresh hyperparameter tuning...")
    return {'last_completed_run': -1, 'results_log': []}

def save_checkpoint(filepath: str, last_run: int, results_log: list, best_map: float, best_params: dict, best_model_path: str):
    try:
        data = {
            'last_completed_run': last_run,
            'results_log': results_log,
            'best_map': best_map,
            'best_params': best_params,
            'best_model_path': best_model_path
        }
        with open(filepath, 'wb') as f:
            pickle.dump(data, f)
        print(f"Checkpoint saved (run {last_run + 1})")
    except:
        print("Checkpoint save failed (continuing anyway)")

def train_single_run(run_idx: int, params: dict):
    print(f"\n=== Configuration {run_idx+1}/{len(HYPERPARAMS)} ===")
    print(f"lr0: {params['lr0']}, weight_decay: {params['weight_decay']}, imgsz: {params['imgsz']}, batch: {params['batch']}")
    print(f"degrees: {params['degrees']}, translate: {params['translate']}, scale: {params['scale']}")
    print(f"mosaic: {params['mosaic']}, hsv_h: {params['hsv_h']}, hsv_s: {params['hsv_s']}")

    model = YOLO('yolov8n.pt')
    results = model.train(
        data=DATASET_YAML,
        epochs=params['epochs'],
        imgsz=params['imgsz'],
        batch=params['batch'],
        lr0=params['lr0'],
        momentum=0.937,
        weight_decay=params['weight_decay'],
        device=0,
        project=PROJECT_NAME,
        name=f'run_{run_idx+1}',
        save=True,
        plots=True,
        verbose=False,
        patience=15,
        degrees=params['degrees'],
        translate=params['translate'],
        scale=params['scale'],
        shear=2,
        perspective=0.0,
        flipud=0.0,
        fliplr=0.0,
        mosaic=params['mosaic'],
        mixup=0.0,
        copy_paste=0.0,
        hsv_h=params['hsv_h'],
        hsv_s=params['hsv_s'],
        hsv_v=params['hsv_v']
    )
    metrics = model.val(data=DATASET_YAML)
    return metrics.box.map50, metrics.box.map

def display_results(results_log, best_map, best_params):
    print("\n=== FINAL HYPERPARAMETER TUNING RESULTS ===")
    print("Run | lr0   | w_decay | imgsz | batch | degrees | mosaic | mAP@0.5")
    print("----|-------|---------|-------|-------|---------|--------|--------")
    for r in results_log:
        print(f"{r['run']:2d}  | {r['lr0']:.3f} | {r['weight_decay']:.4f}  | {r['imgsz']:5d} | {r['batch']:5d} | {r['degrees']:7.1f} | {r['mosaic']:6.1f} | {r['mAP@0.5']:.4f}")

    print(f"\n✓ BEST CONFIGURATION:")
    for key, value in best_params.items():
        print(f"{key}: {value}")
    print(f"✓ Best mAP@0.5: {best_map:.4f}")

def visualize_results(results_log, best_map):
    plt.figure(figsize=(12, 8))
    runs = [r['run'] for r in results_log]
    maps = [r['mAP@0.5'] for r in results_log]
    colors = ['red' if m == best_map else 'blue' for m in maps]

    plt.bar(runs, maps, color=colors, alpha=0.7)
    plt.xlabel('Run Number')
    plt.ylabel('mAP@0.5')
    plt.title(f'Hyperparameter Tuning Results - Best mAP@0.5: {best_map:.4f}')
    plt.grid(True, alpha=0.3)

    best_run = next(r['run'] for r in results_log if r['mAP@0.5'] == best_map)
    plt.annotate(f'Best: {best_map:.4f}',
                 xy=(best_run, best_map),
                 xytext=(best_run, best_map + 0.02),
                 arrowprops=dict(arrowstyle='->', color='red'),
                 fontsize=12, ha='center')

    plt.tight_layout()
    plt.savefig('final_hyperparameter_results.png', dpi=150, bbox_inches='tight')
    plt.show()

save_dataset_yaml(DATASET_CONFIG)

checkpoint = load_checkpoint(CHECKPOINT_FILE)
start_run = checkpoint['last_completed_run'] + 1
results_log = checkpoint['results_log']
best_map = max([r['mAP@0.5'] for r in results_log], default=0)
best_params = None
best_model_path = None

if results_log:
    best_result = max(results_log, key=lambda x: x['mAP@0.5'])
    best_params = {k: v for k, v in best_result.items() if k not in ['run', 'mAP@0.5', 'mAP@0.5:0.95', 'model_path']}
    best_model_path = best_result['model_path']

for i in range(start_run, len(HYPERPARAMS)):
    params = HYPERPARAMS[i]
    try:
        current_map, current_map_avg = train_single_run(i, params)

        result_entry = {
            'run': i+1,
            **params,
            'mAP@0.5': current_map,
            'mAP@0.5:0.95': current_map_avg,
            'model_path': f'{PROJECT_NAME}/run_{i+1}/weights/best.pt'
        }
        results_log.append(result_entry)

        print(f"mAP@0.5: {current_map:.4f}, mAP@0.5:0.95: {current_map_avg:.4f}")

        if current_map > best_map:
            best_map = current_map
            best_params = params.copy()
            best_model_path = result_entry['model_path']
            print(f"✓ New best mAP@0.5: {best_map:.4f}")

        save_checkpoint(CHECKPOINT_FILE, i, results_log, best_map, best_params, best_model_path)

    except Exception as e:
        print(f"Error in run {i+1}: {e}")
        print("Continuing to next configuration...")
        continue

display_results(results_log, best_map, best_params)
visualize_results(results_log, best_map)

try:
    shutil.make_archive('/kaggle/working/hyperparameter_results', 'zip', PROJECT_NAME)
    print("\n✓ Results zipped as 'hyperparameter_results.zip' - DOWNLOAD THIS!")
except:
    print("Zip creation failed, download individual files manually")

if best_model_path and os.path.exists(best_model_path):
    best_model = YOLO(best_model_path)
    best_model.save('BEST_id_detection_model.pt')
    print(f"\n✓ Best model saved as: BEST_id_detection_model.pt")

    try:
        if os.path.exists(CHECKPOINT_FILE):
            os.remove(CHECKPOINT_FILE)
            print("✓ Checkpoint file cleaned up")
    except:
        pass

print(f"✓ Final best mAP@0.5: {best_map:.4f}")
print(f"✓ Results visualization saved to: final_hyperparameter_results.png")


Dataset configuration saved to dataset.yaml
Starting fresh hyperparameter tuning...

=== Configuration 1/5 ===
lr0: 0.002, weight_decay: 0.0005, imgsz: 640, batch: 16
degrees: 2, translate: 0.05, scale: 0.1
mosaic: 0.5, hsv_h: 0.015, hsv_s: 0.7
[KDownloading https://github.com/ultralytics/assets/releases/download/v8.3.0/yolov8n.pt to 'yolov8n.pt': 100% ━━━━━━━━━━━━ 6.2/6.2MB 80.2MB/s 0.1s
Ultralytics 8.3.191 🚀 Python-3.11.13 torch-2.6.0+cu124 CUDA:0 (Tesla T4, 15095MiB)
[34m[1mengine/trainer: [0magnostic_nms=False, amp=True, augment=False, auto_augment=randaugment, batch=16, bgr=0.0, box=7.5, cache=False, cfg=None, classes=None, close_mosaic=10, cls=0.5, conf=None, copy_paste=0.0, copy_paste_mode=flip, cos_lr=False, cutmix=0.0, data=dataset.yaml, degrees=2, deterministic=True, device=0, dfl=1.5, dnn=False, dropout=0.0, dynamic=False, embed=None, epochs=35, erasing=0.4, exist_ok=False, fliplr=0.0, flipud=0.0, format=torchscript, fraction=1.0, freeze=None, half=False, hsv_h=0.015, hs

## Evaluation

In [None]:
!pip install ultralytics

In [None]:
from ultralytics import YOLO
import os

best_model_path = '/kaggle/input/id-bound-model/id_detection_best.pt' 

if os.path.exists(best_model_path):
    model = YOLO(best_model_path)
    print(f"✓ Loaded best model from {best_model_path}")
else:
    raise FileNotFoundError(f"Model not found at {best_model_path}")

metrics = model.val(data='/kaggle/input/id-bound-model/dataset_id_detection.yaml', save_json=True, plots=True)

print("\n=== DETECTION METRICS ===")
print(f"mAP@0.5: {metrics.box.map50:.4f}")
print(f"mAP@0.5:0.95: {metrics.box.map:.4f}")
print(f"Precision: {metrics.box.mp:.4f}")
print(f"Recall: {metrics.box.mr:.4f}")

In [None]:
test_images = '/kaggle/input/id-bound/test/images'

metrics = model.val(
    data='/kaggle/input/id-bound-model/dataset_id_detection.yaml', 
    split='test',   
    save_json=True,
    plots=True
)

print("\n=== TEST METRICS ===")
print(f"mAP@0.5: {metrics.box.map50:.4f}")
print(f"mAP@0.5:0.95: {metrics.box.map:.4f}")
print(f"Precision: {metrics.box.mp:.4f}")
print(f"Recall: {metrics.box.mr:.4f}")


results = model.predict(
    source=test_images,   
    save=False,            
    save_txt=False,       
    imgsz=640,           
    conf=0.25,
    verbose=False)


for r in results[:5]: 
    r.show()     

# Rotation Correction 

## Hyperparameter Tuning

In [None]:
import os
import math
import time
import json
import itertools
import numpy as np
from collections import defaultdict
from PIL import Image
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torchvision import models
import albumentations as A
from albumentations.pytorch import ToTensorV2
import matplotlib.pyplot as plt
from sklearn.metrics import precision_recall_fscore_support, accuracy_score

class EarlyStopping:
    def __init__(self, patience=7, min_delta=0.1, restore_best_weights=True):
        self.patience = patience
        self.min_delta = min_delta
        self.restore_best_weights = restore_best_weights
        self.best_loss = None
        self.counter = 0
        self.best_weights = None
        
    def __call__(self, val_loss, model):
        if self.best_loss is None or val_loss < self.best_loss - self.min_delta:
            self.best_loss = val_loss
            self.counter = 0
            self.best_weights = model.state_dict().copy()
        else:
            self.counter += 1
            
        if self.counter >= self.patience and self.restore_best_weights:
            model.load_state_dict(self.best_weights)
            return True
        return False

# class IDRotationDataset(Dataset):
#     def __init__(self, image_dir, transform=None, angles=None, rotation_jitter=0, original_angle=0):
#         self.image_paths = [os.path.join(image_dir, f) for f in os.listdir(image_dir) if f.lower().endswith(('png','jpg','jpeg'))]
#         self.transform = transform
#         self.angles = angles
#         self.rotation_jitter = rotation_jitter
#         self.original_angle = original_angle
    
#     def __len__(self):
#         return len(self.image_paths)
    
#     def __getitem__(self, idx):
#         img = Image.open(self.image_paths[idx]).convert("RGB")
#         target_angle = np.random.choice(self.angles) if self.angles else np.random.uniform(0, 360)
#         actual_rotation = target_angle + np.random.uniform(-self.rotation_jitter, self.rotation_jitter) if self.rotation_jitter else target_angle
#         rotation_amount = actual_rotation - self.original_angle
#         img = np.array(img.rotate(rotation_amount, expand=True, fillcolor=(255,255,255)))
#         if self.transform:
#             img = self.transform(image=img)["image"]
#         target_rad = math.radians(target_angle)
#         label = torch.tensor([math.sin(target_rad), math.cos(target_rad)], dtype=torch.float)
#         return img, label

train_transform = A.Compose([
    A.Resize(224, 224),
    A.Perspective(scale=(0.0, 0.08), p=0.5),
    A.Affine(scale=(0.8, 1.2), shear=(-10, 10), p=0.5),
    A.RandomBrightnessContrast(brightness_limit=0.2, contrast_limit=0.2, p=0.5),
    A.ImageCompression(quality_range=(40, 90), p=0.5),
    A.Blur(blur_limit=(3, 5), p=0.2),
    A.GaussNoise(var_limit=(5, 15), p=0.2),
    A.RandomShadow(p=0.2),
    A.RandomSunFlare(p=0.2, src_radius=50, flare_roi=(0,0,1,0.5), src_color=(255,255,255)),
    A.Normalize(mean=(0.5,0.5,0.5), std=(0.5,0.5,0.5)),
    ToTensorV2()
])

valid_transform = A.Compose([
    A.Resize(224, 224),
    A.Normalize(mean=(0.5,0.5,0.5), std=(0.5,0.5,0.5)),
    ToTensorV2()
])

# class RotationRegressor(nn.Module):
#     def __init__(self, backbone_name="mobilenet_v3_small", pretrained=True):
#         super().__init__()
#         backbones = {
#             "mobilenet_v3_small": (models.mobilenet_v3_small(pretrained=pretrained), 576),
#             "mobilenet_v3_large": (models.mobilenet_v3_large(pretrained=pretrained), 960),
#             "resnet18": (models.resnet18(pretrained=pretrained), 512),
#             "efficientnet_b0": (models.efficientnet_b0(pretrained=pretrained), 1280),
#             "efficientnet_b2": (models.efficientnet_b2(pretrained=pretrained), 1408)
#         }
#         if backbone_name not in backbones:
#             raise ValueError(f"Unknown backbone: {backbone_name}")
#         backbone, feature_dim = backbones[backbone_name]
#         if "mobilenet" in backbone_name or "efficientnet" in backbone_name:
#             backbone.classifier = nn.Identity()
#         else:
#             backbone.fc = nn.Identity()
#         self.backbone = backbone
#         self.head = nn.Sequential(nn.Linear(feature_dim, 128), nn.ReLU(), nn.Dropout(0.1), nn.Linear(128,2))
        
#     def forward(self, x):
#         return F.normalize(self.head(self.backbone(x)), dim=1)

# def vector_to_angle(vec):
#     return math.degrees(math.atan2(vec[0], vec[1])) % 360

# def discretize_angle(angle, classes):
#     return min(classes, key=lambda x: abs(x - angle))

# def angular_error(preds, labels):
#     pred_angles = np.array([vector_to_angle(p) for p in preds])
#     true_angles = np.array([vector_to_angle(l) for l in labels])
#     diffs = np.abs(pred_angles - true_angles)
#     diffs = np.minimum(diffs, 360 - diffs)
#     return np.mean(diffs), np.std(diffs)

# def train_model(config, train_loader, valid_loader, device, max_epochs=30):
#     model = RotationRegressor(config['backbone']).to(device)
#     criterion = nn.MSELoss() if config['criterion']=='MSE' else nn.SmoothL1Loss()
#     optimizer = torch.optim.Adam(model.parameters(), lr=config['lr'], weight_decay=config['weight_decay']) if config['optimizer']=='Adam' else torch.optim.AdamW(model.parameters(), lr=config['lr'], weight_decay=config['weight_decay'])
#     early_stopping = EarlyStopping(patience=7, min_delta=0.1)
#     train_losses, val_losses, angular_errors = [], [], []
#     angle_classes = config['angle_classes']
#     start_time = time.time()
    
#     for epoch in range(max_epochs):
#         model.train()
#         train_loss = sum(criterion(model(imgs.to(device)), labels.to(device)).item() * imgs.size(0) for imgs, labels in train_loader)/len(train_loader.dataset)
#         train_losses.append(train_loss)
#         model.eval()
#         val_loss, all_pred_vecs, all_true_vecs, all_pred, all_true = 0, [], [], [], []
#         with torch.no_grad():
#             for imgs, labels in valid_loader:
#                 imgs, labels = imgs.to(device), labels.to(device)
#                 preds = model(imgs)
#                 val_loss += criterion(preds, labels).item()*imgs.size(0)
#                 all_pred_vecs.extend(preds.cpu().numpy())
#                 all_true_vecs.extend(labels.cpu().numpy())
#                 if config['evaluation_mode']=='classification':
#                     for p,l in zip(preds,labels):
#                         all_pred.append(discretize_angle(vector_to_angle(p.cpu().numpy()), angle_classes))
#                         all_true.append(discretize_angle(vector_to_angle(l.cpu().numpy()), angle_classes))
#         val_loss /= len(valid_loader.dataset)
#         val_losses.append(val_loss)
#         mean_ang_err, std_ang_err = angular_error(all_pred_vecs, all_true_vecs)
#         angular_errors.append(mean_ang_err)
#         if config['evaluation_mode']=='classification':
#             acc = accuracy_score(all_true, all_pred)
#             prec, rec, f1, _ = precision_recall_fscore_support(all_true, all_pred, average="weighted", zero_division=0)
#             print(f"Epoch {epoch+1}/{max_epochs} | TL: {train_loss:.4f} | VL: {val_loss:.4f} | Acc: {acc:.3f} | F1: {f1:.3f} | AngErr: {mean_ang_err:.2f}°±{std_ang_err:.2f}")
#         else:
#             print(f"Epoch {epoch+1}/{max_epochs} | TL: {train_loss:.4f} | VL: {val_loss:.4f} | AngErr: {mean_ang_err:.2f}°±{std_ang_err:.2f}")
#         if early_stopping(mean_ang_err, model):
#             print(f"Early stopping at epoch {epoch+1}")
#             break

#     training_time = time.time() - start_time
#     return {
#         'config': config,
#         'best_angular_error': min(angular_errors),
#         'final_angular_error': angular_errors[-1],
#         'training_time': training_time,
#         'epochs_trained': len(train_losses),
#         'train_losses': train_losses,
#         'val_losses': val_losses,
#         'angular_errors': angular_errors
#     }

# def get_hyperparameter_grid():
#     criterions = ['MSE','SmoothL1Loss']
#     optimizers = ['Adam','AdamW']
#     learning_rates = [1e-3,5e-4,1e-4,1e-5]
#     weight_decays = [0,1e-4]
#     backbones = ['mobilenet_v3_small','mobilenet_v3_large','resnet18','efficientnet_b0','efficientnet_b2']
#     angle_configs = [
#         {'name':'8_classes_coarse','angles':[0,45,90,135,180,225,270,315],'angle_classes':[0,45,90,135,180,225,270,315],'evaluation_mode':'classification'},
#         {'name':'16_classes_fine','angles':[i*22.5 for i in range(16)],'angle_classes':[i*22.5 for i in range(16)],'evaluation_mode':'classification'},
#         {'name':'pure_regression','angles':None,'angle_classes':None,'evaluation_mode':'regression'}
#     ]
#     rotation_jitters = [0,5]
#     batch_sizes = [16,32,64]
#     return [
#         {
#             'criterion':c,'optimizer':o,'lr':lr,'weight_decay':wd,'backbone':b,
#             'angles':ac['angles'],'angle_classes':ac['angle_classes'],'evaluation_mode':ac['evaluation_mode'],
#             'angle_config_name':ac['name'],'rotation_jitter':j,'batch_size':bs
#         }
#         for c,o,lr,wd,b,ac,j,bs in itertools.product(criterions,optimizers,learning_rates,weight_decays,backbones,angle_configs,rotation_jitters,batch_sizes)
#     ]

# def run_hyperparameter_search(image_dir_train, image_dir_valid, max_configs=50):
#     device = "cuda" if torch.cuda.is_available() else "cpu"
#     param_grid = get_hyperparameter_grid()
#     if len(param_grid) > max_configs:
#         param_grid = np.random.choice(param_grid, max_configs, replace=False).tolist()
#     all_results = []
#     for config in param_grid:
#         try:
#             train_loader = DataLoader(IDRotationDataset(image_dir_train, transform=train_transform, angles=config['angles'], rotation_jitter=config['rotation_jitter']), batch_size=config['batch_size'], shuffle=True, num_workers=2)
#             valid_loader = DataLoader(IDRotationDataset(image_dir_valid, transform=valid_transform, angles=config['angles'], rotation_jitter=0), batch_size=config['batch_size'], shuffle=False, num_workers=2)
#             all_results.append(train_model(config, train_loader, valid_loader, device))
#             torch.cuda.empty_cache()
#         except Exception as e:
#             print(f"Error: {e}")
#     return all_results

# def analyze_results(results):
#     if not results: return None
#     results.sort(key=lambda x: x['best_angular_error'])
#     fig, axes = plt.subplots(2,3,figsize=(18,12))
#     backbones = [r['config']['backbone'] for r in results]
#     angular_errors = [r['best_angular_error'] for r in results]
#     def plot_bar(ax, keys, values, title): ax.bar(range(len(keys)), values); ax.set_xticks(range(len(keys))); ax.set_xticklabels(keys, rotation=45); ax.set_title(title)
#     backbone_means = [np.mean([ae for b,ae in zip(backbones,angular_errors) if b==bk]) for bk in set(backbones)]
#     plot_bar(axes[0,0], list(set(backbones)), backbone_means, "Angular Error by Backbone")
#     best_result = results[0]
#     axes[1,2].plot(best_result['angular_errors'], 'b-'); axes[1,2].set_title(f'Best Model Training Curve\nError: {best_result["best_angular_error"]:.2f}°')
#     plt.tight_layout(); plt.show()
#     return best_result

# TRAIN_DIR = "/kaggle/input/idtextlines/train/images"
# VALID_DIR = "/kaggle/input/idtextlines/valid/images"
# # results = run_hyperparameter_search(TRAIN_DIR, VALID_DIR, max_configs=10)
# # best_config = analyze_results(results)
# # with open('hyperparameter_search_results.json','w') as f:
# #     for r in results: 
# #         for k in ['train_losses','val_losses','angular_errors']: r[k] = [float(x) for x in r[k]]
# #     json.dump(results,f,indent=2)
# # print(f"Best angular error: {best_config['best_angular_error']:.2f}°")

## Train

In [None]:
import os
import math
import numpy as np
from PIL import Image
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torchvision import models
import albumentations as A
from albumentations.pytorch import ToTensorV2
import matplotlib.pyplot as plt
from sklearn.metrics import precision_recall_fscore_support, accuracy_score, confusion_matrix, ConfusionMatrixDisplay


class IDRotationDataset(Dataset):
    
    def __init__(self, image_dir, transform=None, angles=None, rotation_jitter=0, original_angle=0):
        self.image_paths = [os.path.join(image_dir, f) for f in os.listdir(image_dir)
                            if f.lower().endswith(('png', 'jpg', 'jpeg'))]
        self.transform = transform
        self.angles = angles
        self.rotation_jitter = rotation_jitter
        self.original_angle = original_angle

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        img = Image.open(img_path).convert("RGB")

        target_angle = np.random.choice(self.angles) if self.angles else np.random.uniform(0, 360)
        jitter = np.random.uniform(-self.rotation_jitter, self.rotation_jitter) if self.rotation_jitter else 0
        rotation_amount = target_angle + jitter - self.original_angle

        img = img.rotate(rotation_amount, expand=True, fillcolor=(255, 255, 255))
        img = np.array(img)
        if self.transform:
            img = self.transform(image=img)["image"]

        angle_rad = math.radians(target_angle)
        label = torch.tensor([math.sin(angle_rad), math.cos(angle_rad)], dtype=torch.float)

        return img, label

=
def get_transforms():
    train_transform = A.Compose([
        A.Resize(224, 224),
        A.Perspective(scale=(0.0, 0.08), p=0.5),
        A.Affine(scale=(0.8, 1.2), shear=(-10, 10), p=0.5),
        A.RandomBrightnessContrast(brightness_limit=0.2, contrast_limit=0.2, p=0.5),
        A.ImageCompression(quality_range=(40, 90), p=0.5),
        A.Blur(blur_limit=(3, 5), p=0.2),
        A.GaussNoise(var_limit=(5, 15), p=0.2),
        A.RandomShadow(p=0.2),
        A.RandomSunFlare(p=0.2, src_radius=50, flare_roi=(0,0,1,0.5), src_color=(255,255,255)),
        A.Normalize(mean=(0.5, 0.5, 0.5), std=(0.5, 0.5, 0.5)),
        ToTensorV2()
    ])

    valid_transform = A.Compose([
        A.Resize(224, 224),
        A.Normalize(mean=(0.5, 0.5, 0.5), std=(0.5, 0.5, 0.5)),
        ToTensorV2()
    ])
    
    return train_transform, valid_transform

def create_dataloaders(train_dir, valid_dir, angles, batch_size=16, rotation_jitter=5, num_workers=2):
    train_transform, valid_transform = get_transforms()
    
    train_dataset = IDRotationDataset(train_dir, transform=train_transform, angles=angles, rotation_jitter=rotation_jitter)
    valid_dataset = IDRotationDataset(valid_dir, transform=valid_transform, angles=angles, rotation_jitter=0)
    
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=num_workers)
    valid_loader = DataLoader(valid_dataset, batch_size=batch_size, shuffle=False, num_workers=num_workers)
    
    return train_loader, valid_loader

class RotationRegressor(nn.Module):
    
    def __init__(self, pretrained=True):
        super().__init__()
        self.backbone = models.mobilenet_v3_small(pretrained=pretrained)
        self.backbone.classifier = nn.Identity()
        self.head = nn.Sequential(
            nn.Linear(576, 128),
            nn.ReLU(),
            nn.Linear(128, 2)
        )
    
    def forward(self, x):
        features = self.backbone(x)
        out = self.head(features)
        return F.normalize(out, dim=1)  


angle_classes = [0, 45, 90, 135, 180, 225, 270, 315]

def vector_to_angle(vec):
    angle = math.degrees(math.atan2(vec[0], vec[1]))
    return angle % 360

def discretize_angle(angle, classes=angle_classes):
    return min(classes, key=lambda x: abs(x - angle))

def angular_error(preds, labels):
    pred_angles = np.array([vector_to_angle(p) for p in preds])
    true_angles = np.array([vector_to_angle(l) for l in labels])
    diffs = np.abs(pred_angles - true_angles)
    diffs = np.minimum(diffs, 360 - diffs)
    return np.mean(diffs), np.std(diffs)

def train_model(model, train_loader, valid_loader, criterion, optimizer, device, num_epochs=30, early_stopping=None):
    train_losses, val_losses = [], []

    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        for imgs, labels in train_loader:
            imgs, labels = imgs.to(device), labels.to(device)
            optimizer.zero_grad()
            preds = model(imgs)
            loss = criterion(preds, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item() * imgs.size(0)
        train_loss = running_loss / len(train_loader.dataset)
        train_losses.append(train_loss)

        model.eval()
        val_loss, all_true, all_pred, all_true_vecs, all_pred_vecs = 0.0, [], [], [], []
        with torch.no_grad():
            for imgs, labels in valid_loader:
                imgs, labels = imgs.to(device), labels.to(device)
                preds = model(imgs)
                loss = criterion(preds, labels)
                val_loss += loss.item() * imgs.size(0)

                all_pred_vecs.extend(preds.cpu().numpy())
                all_true_vecs.extend(labels.cpu().numpy())
                for p, l in zip(preds, labels):
                    all_pred.append(discretize_angle(vector_to_angle(p.cpu().numpy())))
                    all_true.append(discretize_angle(vector_to_angle(l.cpu().numpy())))

        val_loss /= len(valid_loader.dataset)
        val_losses.append(val_loss)
        mean_ang_err, std_ang_err = angular_error(all_pred_vecs, all_true_vecs)
        acc = accuracy_score(all_true, all_pred)
        prec, rec, f1, _ = precision_recall_fscore_support(all_true, all_pred, average="weighted", zero_division=0)

        print(f"Epoch {epoch+1}/{num_epochs} | "
              f"Train Loss: {train_loss:.4f} | Val Loss: {val_loss:.4f} | "
              f"Acc: {acc:.3f} | Prec: {prec:.3f} | Rec: {rec:.3f} | F1: {f1:.3f} | "
              f"MeanAngErr: {mean_ang_err:.2f}° ± {std_ang_err:.2f}")

        if early_stopping and early_stopping(val_loss, model):
            print(f"Early stopping at epoch {epoch+1}")
            break

    return train_losses, val_losses, all_true, all_pred


def plot_training_curve(train_losses, val_losses):
    plt.figure(figsize=(8,5))
    plt.plot(train_losses, label="Train Loss")
    plt.plot(val_losses, label="Val Loss")
    plt.xlabel("Epoch")
    plt.ylabel("Loss")
    plt.legend()
    plt.title("Training & Validation Loss")
    plt.show()

def plot_confusion_matrix(y_true, y_pred, classes=angle_classes):
    cm = confusion_matrix(y_true, y_pred, labels=classes)
    disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=classes)
    disp.plot(cmap="Blues", xticks_rotation=45)
    plt.title("Confusion Matrix (Final Epoch)")
    plt.show()


device = "cuda" if torch.cuda.is_available() else "cpu"
angles = [0, 45, 90, 135, 180, 225, 270, 315]
train_loader, valid_loader = create_dataloaders(
    "/kaggle/input/idtextlines/train/images",
    "/kaggle/input/idtextlines/valid/images",
    angles
)

model = RotationRegressor(pretrained=True).to(device)
criterion = nn.SmoothL1Loss()
optimizer = torch.optim.Adam(model.parameters(), lr=5e-4)
early_stopping = EarlyStopping(patience=5, min_delta=0.001, restore_best_weights=True)

train_losses, val_losses, all_true, all_pred = train_model(model, train_loader, valid_loader, criterion, optimizer, device, 30, early_stopping)
plot_training_curve(train_losses, val_losses)
plot_confusion_matrix(all_true, all_pred)


## Evaluation

In [None]:
import torch

device = "cuda" if torch.cuda.is_available() else "cpu"

model = RotationRegressor().to(device)

checkpoint_path = "/kaggle/input/rotation-correction-model/RotationClassification.pth"
model.load_state_dict(torch.load(checkpoint_path, map_location=device), strict=False)

model.eval()


In [None]:
valid_dataset = IDRotationDataset(
    image_dir="/kaggle/input/idtextlines/valid/images",  
    transform=valid_transform,  
    angles=None,                
    rotation_jitter=0,          
    original_angle=0
)

valid_loader = DataLoader(
    valid_dataset,
    batch_size=32,     
    shuffle=False,    
    num_workers=2     
)
model.eval()
all_pred_vecs, all_true_vecs = [], []

with torch.no_grad():
    for imgs, labels in valid_loader:  
        imgs, labels = imgs.to(device), labels.to(device)
        preds = model(imgs)
        all_pred_vecs.extend(preds.cpu().numpy())
        all_true_vecs.extend(labels.cpu().numpy())

mean_ang_err, std_ang_err = angular_error(all_pred_vecs, all_true_vecs)
print(f"Mean Angular Error: {mean_ang_err:.2f}° ± {std_ang_err:.2f}°")


# ID Lines Detection

## Train and Hyperparameter Tuning

In [None]:
!pip install ultralytics

In [None]:
import os
import json
import random
from datetime import datetime
import yaml
import matplotlib.pyplot as plt
import pandas as pd
from ultralytics import YOLO

def create_dataset_config():
    dataset_config = {
        'path': '/kaggle/input/idtextlines',
        'train': 'train/images',
        'val': 'valid/images',
        'nc': 1,
        'names': ['text_line']
    }
    with open('textlines_dataset.yaml', 'w') as f:
        yaml.dump(dataset_config, f)
    print("Dataset config created: textlines_dataset.yaml")


def sample_params():
    return {
        'epochs': random.choice([30, 50, 80, 100]),
        'batch': random.choice([8, 16, 24, 32]),
        'lr0': random.uniform(0.0005, 0.02),
        'lrf': random.uniform(0.01, 0.2),
        'momentum': random.uniform(0.8, 0.95),
        'weight_decay': random.uniform(0.0001, 0.001),
        'warmup_epochs': random.uniform(1, 5),
        'warmup_momentum': random.uniform(0.5, 0.9),
        'box': random.uniform(5, 10),
        'cls': random.uniform(0.3, 1.0),
        'dfl': random.uniform(1.0, 2.0),
        'hsv_h': random.uniform(0.01, 0.02),
        'hsv_s': random.uniform(0.5, 0.9),
        'hsv_v': random.uniform(0.3, 0.6),
        'degrees': random.uniform(5, 15),
        'translate': random.uniform(0.05, 0.2),
        'scale': random.uniform(0.3, 0.7),
        'mixup': random.uniform(0.0, 0.3)
    }


def random_search_tuning(n_trials=20):
    import random
    
    results_log = []
    best_map = 0
    best_params = None
    
    print(f"Starting random search with {n_trials} trials...")
    
    for trial in range(n_trials):
        params = sample_params()
        print(f"\nTrial {trial+1}/{n_trials}")
        print(f"Parameters: {params}")
        print(f"Early stopping patience: 10 epochs")
        
        try:
            model = YOLO('yolov8n.pt')
            
            results = model.train(
                data='textlines_dataset.yaml',
                project='random_search',
                name=f'trial_{trial}',
                verbose=False,
                plots=False,
                **params
            )
            
            map_score = results.metrics['metrics/mAP50(B)']
            
            result_entry = {
                'trial': trial,
                'map50': map_score,
                **params,
                'timestamp': datetime.now().isoformat()
            }
            results_log.append(result_entry)
            
            if map_score > best_map:
                best_map = map_score
                best_params = params
                print(f"New best mAP: {map_score:.4f}")
            
        except Exception as e:
            print(f"Error in trial {trial}: {str(e)}")
    
    with open('random_search_results.json', 'w') as f:
        json.dump(results_log, f, indent=2)
    
    return best_params, results_log

    study = optuna.create_study(direction='maximize')
    study.optimize(objective, n_trials=n_trials)
    
    best_params = study.best_params
    best_score = study.best_value
    
    print(f"Best parameters: {best_params}")
    print(f"Best mAP50: {best_score:.4f}")
    print(f"Best early stopping patience: {best_params['patience']} epochs")
    
    df = study.trials_dataframe()
    df.to_csv('optuna_results.csv', index=False)
    
    return best_params, study



def analyze_hyperparameter_results():
    results_files = [
        'grid_search_results.json',
        'random_search_results.json',
        'optuna_results.csv'
    ]
    all_results = []

    for file in results_files[:2]:
        if os.path.exists(file):
            with open(file, 'r') as f:
                data = json.load(f)
                for entry in data:
                    entry['method'] = file.split('_')[0]
                    all_results.append(entry)

    if os.path.exists('optuna_results.csv'):
        df_optuna = pd.read_csv('optuna_results.csv')
        for _, row in df_optuna.iterrows():
            if 'value' in row and not pd.isna(row['value']):
                entry = {
                    'map50': row['value'],
                    'method': 'optuna',
                    'experiment': row['number']
                }
                param_cols = [col for col in df_optuna.columns if col.startswith('params_')]
                for col in param_cols:
                    entry[col.replace('params_', '')] = row[col]
                all_results.append(entry)

    if not all_results:
        print("No tuning results found. Run tuning first.")
        return

    df = pd.DataFrame(all_results)
    plt.figure(figsize=(15, 10))

    plt.subplot(2, 3, 1)
    if 'method' in df.columns:
        df.boxplot(column='map50', by='method', ax=plt.gca())
        plt.title('mAP50 by Tuning Method')
        plt.suptitle('')

    plt.subplot(2, 3, 2)
    if 'lr0' in df.columns:
        plt.scatter(df['lr0'], df['map50'], alpha=0.6)
        plt.xlabel('Learning Rate')
        plt.ylabel('mAP50')
        plt.title('Learning Rate vs mAP50')

    plt.subplot(2, 3, 3)
    if 'batch' in df.columns:
        df.boxplot(column='map50', by='batch', ax=plt.gca())
        plt.title('mAP50 by Batch Size')
        plt.suptitle('')

    plt.subplot(2, 3, 4)
    if 'epochs' in df.columns:
        plt.scatter(df['epochs'], df['map50'], alpha=0.6)
        plt.xlabel('Epochs')
        plt.ylabel('mAP50')
        plt.title('Epochs vs mAP50')

    plt.subplot(2, 3, 5)
    if 'patience' in df.columns:
        plt.scatter(df['patience'], df['map50'], alpha=0.6)
        plt.xlabel('Early Stopping Patience')
        plt.ylabel('mAP50')
        plt.title('Early Stopping Patience vs mAP50')

    plt.subplot(2, 3, 6)
    top_10 = df.nlargest(10, 'map50')
    plt.barh(range(len(top_10)), top_10['map50'])
    plt.yticks(range(len(top_10)), [f"Exp {i}" for i in top_10.index])
    plt.xlabel('mAP50')
    plt.title('Top 10 Experiments')

    plt.tight_layout()
    plt.savefig('hyperparameter_analysis.png', dpi=300, bbox_inches='tight')
    plt.show()

    best_result = df.loc[df['map50'].idxmax()]
    print(f"\nBest overall result:")
    print(f"mAP50: {best_result['map50']:.4f}")
    print(f"Method: {best_result.get('method', 'unknown')}")
    print(f"Early stopping patience: {best_result.get('patience', 'N/A')}")

    return df


# if __name__ == "__main__":
#     create_dataset_config()

#     best_random, random_log = random_search_tuning(n_trials=10)


#     best_model_path = os.path.join('random_search', 'best.pt')
#     if os.path.exists(best_model_path):
#         model = YOLO(best_model_path)
#         results = model.train(
#             data='textlines_dataset.yaml',
#             epochs=50,
#             lr0=0.001,
#             patience=10,
#             project='fine_tuning',
#             name='resumed_training'
#         )
#         print("\nResumed training completed.")

#     analyze_hyperparameter_results()



## Evaluation

In [None]:
from ultralytics import YOLO

best_model_path = '/kaggle/input/id-lines-detection/best.pt'

model = YOLO(best_model_path)

results = model.val(
    data='textlines_dataset.yaml',
    batch=32,
    imgsz=640,
    verbose=False
)
metrics_dict = results.results_dict

print("Validation Results:")
print(f"mAP50: {metrics_dict.get('metrics/mAP50(B)', 'N/A'):.4f}")
print(f"Precision: {metrics_dict.get('metrics/precision(B)', 'N/A'):.4f}")
print(f"Recall: {metrics_dict.get('metrics/recall(B)', 'N/A'):.4f}")