# üöÄ RDFNet Training & Evaluation

**Object Detection in Foggy Scenes**

This notebook trains RDFNet from scratch on your VOC-FOG dataset and evaluates on RTTS.

---

**Your Dataset Paths:**
- Training: `/content/drive/MyDrive/dataset/training/VOC2007 2/`
- Testing (RTTS): `/content/drive/MyDrive/dataset/RTTS/VOC2007/`
- Checkpoints: `/content/drive/MyDrive/RDFNet_training_checkpoints/`

---

| Step | Action |
|------|--------|
| **Fresh Start** | Run cells 1-7 |
| **Resume** | Run cells 1-4, then 5 |
| **Evaluate** | Run cells 1-4, then 8-9 |

## 1Ô∏è‚É£ Mount Drive & Clone Repository

In [None]:
# Mount Google Drive
from google.colab import drive
drive.mount('/content/drive')

# Clone fresh repo from GitHub
!rm -rf RDF_net
!git clone https://github.com/habibour/RDF_net.git
%cd RDF_net

# Install dependencies
!pip install thop -q

print("\n‚úÖ Setup complete!")

## 2Ô∏è‚É£ Verify Dataset Paths

In [None]:
import os

# Your paths
TRAIN_FOG = '/content/drive/MyDrive/dataset/training/VOC2007 2/FOG'
TRAIN_ANN = '/content/drive/MyDrive/dataset/training/VOC2007 2/Annotations'
TEST_IMG = '/content/drive/MyDrive/dataset/RTTS/VOC2007/JPEGImages'
TEST_ANN = '/content/drive/MyDrive/dataset/RTTS/VOC2007/Annotations'
CHECKPOINT_DIR = '/content/drive/MyDrive/RDFNet_training_checkpoints'

print("üìÅ Training Dataset:")
if os.path.exists(TRAIN_FOG):
    print(f"  ‚úÖ FOG images: {len(os.listdir(TRAIN_FOG))}")
else:
    print(f"  ‚ùå FOG path not found: {TRAIN_FOG}")

if os.path.exists(TRAIN_ANN):
    print(f"  ‚úÖ Annotations: {len(os.listdir(TRAIN_ANN))}")
else:
    print(f"  ‚ùå Annotations path not found: {TRAIN_ANN}")

print("\nüìÅ Testing Dataset (RTTS):")
if os.path.exists(TEST_IMG):
    print(f"  ‚úÖ Images: {len(os.listdir(TEST_IMG))}")
else:
    print(f"  ‚ùå Images path not found: {TEST_IMG}")

if os.path.exists(TEST_ANN):
    print(f"  ‚úÖ Annotations: {len(os.listdir(TEST_ANN))}")
else:
    print(f"  ‚ùå Annotations path not found: {TEST_ANN}")

print("\nüìÅ Checkpoints:")
os.makedirs(CHECKPOINT_DIR, exist_ok=True)
existing = [f for f in os.listdir(CHECKPOINT_DIR) if f.endswith('.pth')]
if existing:
    print(f"  üì¶ Found {len(existing)} existing checkpoints")
    for f in sorted(existing)[-3:]:
        print(f"      - {f}")
else:
    print("  üì≠ No existing checkpoints (fresh training)")

## 3Ô∏è‚É£ Generate Annotations

In [None]:
import os
import xml.etree.ElementTree as ET
import random

# Paths
TRAIN_FOG = '/content/drive/MyDrive/dataset/training/VOC2007 2/FOG'
TRAIN_ANN = '/content/drive/MyDrive/dataset/training/VOC2007 2/Annotations'
TEST_IMG = '/content/drive/MyDrive/dataset/RTTS/VOC2007/JPEGImages'
TEST_ANN = '/content/drive/MyDrive/dataset/RTTS/VOC2007/Annotations'

# Classes
VOC_CLASSES = ["aeroplane", "bicycle", "bird", "boat", "bottle", "bus", "car", "cat", 
               "chair", "cow", "diningtable", "dog", "horse", "motorbike", "person", 
               "pottedplant", "sheep", "sofa", "train", "tvmonitor"]
RTTS_CLASSES = ['bicycle', 'bus', 'car', 'motorbike', 'person']

def convert_annotation(xml_path, classes):
    try:
        tree = ET.parse(xml_path)
        root = tree.getroot()
        boxes = []
        for obj in root.iter('object'):
            difficult = obj.find('difficult')
            if difficult is not None and difficult.text == '1':
                continue
            name = obj.find('name').text
            if name not in classes:
                continue
            bbox = obj.find('bndbox')
            b = (int(float(bbox.find('xmin').text)), int(float(bbox.find('ymin').text)),
                 int(float(bbox.find('xmax').text)), int(float(bbox.find('ymax').text)))
            boxes.append(f"{b[0]},{b[1]},{b[2]},{b[3]},{classes.index(name)}")
        return boxes
    except:
        return []

# === Generate Training Annotations ===
print("="*60)
print("Generating Training Annotations")
print("="*60)

fog_files = set(os.listdir(TRAIN_FOG))
xml_files = [f[:-4] for f in os.listdir(TRAIN_ANN) if f.endswith('.xml')]
valid_ids = [x for x in xml_files if f"{x}.jpg" in fog_files]

print(f"FOG images: {len(fog_files)}")
print(f"XML annotations: {len(xml_files)}")
print(f"Valid pairs: {len(valid_ids)}")

random.seed(42)
random.shuffle(valid_ids)
train_split = int(len(valid_ids) * 0.9)
train_ids = valid_ids[:train_split]
val_ids = valid_ids[train_split:]

# Write train
train_count = 0
with open('2007_train.txt', 'w') as f:
    for img_id in train_ids:
        img_path = f"{TRAIN_FOG}/{img_id}.jpg"
        boxes = convert_annotation(f"{TRAIN_ANN}/{img_id}.xml", VOC_CLASSES)
        if boxes:
            f.write(f"{img_path} {' '.join(boxes)}\n")
            train_count += 1

# Write val
val_count = 0
with open('2007_val.txt', 'w') as f:
    for img_id in val_ids:
        img_path = f"{TRAIN_FOG}/{img_id}.jpg"
        boxes = convert_annotation(f"{TRAIN_ANN}/{img_id}.xml", VOC_CLASSES)
        if boxes:
            f.write(f"{img_path} {' '.join(boxes)}\n")
            val_count += 1

print(f"\n‚úÖ Train: {train_count} images")
print(f"‚úÖ Val: {val_count} images")

# Create VOC classes file
os.makedirs('model_data', exist_ok=True)
with open('model_data/voc_classes.txt', 'w') as f:
    f.write('\n'.join(VOC_CLASSES))
print("‚úÖ Created voc_classes.txt")

# === Generate RTTS Test Annotations ===
print("\n" + "="*60)
print("Generating RTTS Test Annotations")
print("="*60)

test_count = 0
with open('rtts_test.txt', 'w') as f:
    for xml_file in os.listdir(TEST_ANN):
        if not xml_file.endswith('.xml'):
            continue
        img_id = xml_file[:-4]
        img_file = None
        for ext in ['.jpg', '.png', '.jpeg', '.JPG']:
            if os.path.exists(os.path.join(TEST_IMG, img_id + ext)):
                img_file = img_id + ext
                break
        if img_file:
            boxes = convert_annotation(os.path.join(TEST_ANN, xml_file), RTTS_CLASSES)
            if boxes:
                f.write(f"{TEST_IMG}/{img_file} {' '.join(boxes)}\n")
                test_count += 1

print(f"‚úÖ RTTS test: {test_count} images")

# Create RTTS classes file
with open('model_data/rtts_classes.txt', 'w') as f:
    f.write('\n'.join(RTTS_CLASSES))
print("‚úÖ Created rtts_classes.txt")
print("\n" + "="*60)

## 4Ô∏è‚É£ Setup Config for Fresh Training

In [None]:
# Config for FRESH training (using only pretrained backbone)
config_content = '''
import os

# Dataset Paths
TRAIN_FOG_PATH = '/content/drive/MyDrive/dataset/training/VOC2007 2/FOG'
TRAIN_ANN_PATH = '/content/drive/MyDrive/dataset/training/VOC2007 2/Annotations'
TEST_IMG_PATH = '/content/drive/MyDrive/dataset/RTTS/VOC2007/JPEGImages'
TEST_ANN_PATH = '/content/drive/MyDrive/dataset/RTTS/VOC2007/Annotations'
CHECKPOINT_DIR = '/content/drive/MyDrive/RDFNet_training_checkpoints'

VOC_CLASSES = ["aeroplane", "bicycle", "bird", "boat", "bottle", "bus", "car", "cat", 
               "chair", "cow", "diningtable", "dog", "horse", "motorbike", "person", 
               "pottedplant", "sheep", "sofa", "train", "tvmonitor"]
RTTS_CLASSES = ['bicycle', 'bus', 'car', 'motorbike', 'person']

# Model (fresh training - only backbone weights)
Cuda = True
seed = 114514
distributed = False
sync_bn = False
fp16 = True
model_path = 'model_data/yolov7_tiny_weights.pth'
classes_path = 'model_data/voc_classes.txt'
anchors_path = 'model_data/yolo_anchors.txt'
anchors_mask = [[6, 7, 8], [3, 4, 5], [0, 1, 2]]
input_shape = [640, 640]
pretrained = False

# Training
Init_Epoch = 0
Freeze_Epoch = 100
Freeze_batch_size = 16
UnFreeze_Epoch = 300
Unfreeze_batch_size = 8
Freeze_Train = True

# Optimizer
Init_lr = 1e-2
Min_lr = Init_lr * 0.01
optimizer_type = "sgd"
momentum = 0.937
weight_decay = 5e-4
lr_decay_type = "cos"

# Save
save_period = 10
save_dir = 'logs'
eval_flag = True
eval_period = 10
num_workers = 4

# Annotations
train_annotation_path = '2007_train.txt'
val_annotation_path = '2007_val.txt'
'''

with open('colab_config.py', 'w') as f:
    f.write(config_content)

print("‚úÖ Config set for FRESH training")
print("   Using: yolov7_tiny_weights.pth (backbone only)")
print("   NOT using any saved checkpoints")

## 5Ô∏è‚É£ Resume Training (‚ö†Ô∏è ONLY RUN IF RESUMING)

In [None]:
# =============================================
# ‚ö†Ô∏è RUN THIS CELL ONLY IF YOU WANT TO RESUME
# =============================================
import os
import shutil

CHECKPOINT_DIR = '/content/drive/MyDrive/RDFNet_training_checkpoints'
os.makedirs('logs', exist_ok=True)

# Find latest checkpoint
checkpoints = [f for f in os.listdir(CHECKPOINT_DIR) if f.startswith('ep') and f.endswith('.pth')]

if checkpoints:
    checkpoints.sort(key=lambda x: int(x.split('-')[0][2:]))
    latest = checkpoints[-1]
    latest_epoch = int(latest.split('-')[0][2:])
    
    # Copy to local logs
    shutil.copy2(f'{CHECKPOINT_DIR}/{latest}', f'logs/{latest}')
    
    print(f"üìå Resuming from: {latest}")
    print(f"üìå Starting at epoch: {latest_epoch}")
    
    # Update config for resume
    resume_config = f'''
import os

TRAIN_FOG_PATH = '/content/drive/MyDrive/dataset/training/VOC2007 2/FOG'
TRAIN_ANN_PATH = '/content/drive/MyDrive/dataset/training/VOC2007 2/Annotations'
TEST_IMG_PATH = '/content/drive/MyDrive/dataset/RTTS/VOC2007/JPEGImages'
TEST_ANN_PATH = '/content/drive/MyDrive/dataset/RTTS/VOC2007/Annotations'
CHECKPOINT_DIR = '/content/drive/MyDrive/RDFNet_training_checkpoints'

VOC_CLASSES = ["aeroplane", "bicycle", "bird", "boat", "bottle", "bus", "car", "cat", 
               "chair", "cow", "diningtable", "dog", "horse", "motorbike", "person", 
               "pottedplant", "sheep", "sofa", "train", "tvmonitor"]
RTTS_CLASSES = ['bicycle', 'bus', 'car', 'motorbike', 'person']

Cuda = True
seed = 114514
distributed = False
sync_bn = False
fp16 = True
model_path = 'logs/{latest}'  # Resume from checkpoint
classes_path = 'model_data/voc_classes.txt'
anchors_path = 'model_data/yolo_anchors.txt'
anchors_mask = [[6, 7, 8], [3, 4, 5], [0, 1, 2]]
input_shape = [640, 640]
pretrained = False

Init_Epoch = {latest_epoch}
Freeze_Epoch = 100
Freeze_batch_size = 16
UnFreeze_Epoch = 300
Unfreeze_batch_size = 8
Freeze_Train = {'True' if latest_epoch < 100 else 'False'}

Init_lr = 1e-2
Min_lr = Init_lr * 0.01
optimizer_type = "sgd"
momentum = 0.937
weight_decay = 5e-4
lr_decay_type = "cos"

save_period = 10
save_dir = 'logs'
eval_flag = True
eval_period = 10
num_workers = 4

train_annotation_path = '2007_train.txt'
val_annotation_path = '2007_val.txt'
'''
    with open('colab_config.py', 'w') as f:
        f.write(resume_config)
    print("‚úÖ Config updated for resume!")
else:
    print("‚ùå No checkpoints found. Will start fresh training.")

## 6Ô∏è‚É£ Start Training with Auto-Backup

In [None]:
import subprocess
import time
import os
import shutil

CHECKPOINT_DIR = '/content/drive/MyDrive/RDFNet_training_checkpoints'
os.makedirs(CHECKPOINT_DIR, exist_ok=True)
os.makedirs('logs', exist_ok=True)

def backup_checkpoints():
    """Backup checkpoints to Drive"""
    for f in os.listdir('logs'):
        if f.endswith('.pth'):
            src = f'logs/{f}'
            dst = f'{CHECKPOINT_DIR}/{f}'
            if not os.path.exists(dst) or os.path.getmtime(src) > os.path.getmtime(dst):
                shutil.copy2(src, dst)
                print(f"üíæ Backed up: {f}")

print("üöÄ Starting training...")
print("="*60)

try:
    process = subprocess.Popen(
        ['python', 'colab_train.py'],
        stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT,
        universal_newlines=True,
        bufsize=1
    )
    
    last_backup = time.time()
    
    for line in process.stdout:
        print(line, end='')
        
        # Backup every 5 minutes
        if time.time() - last_backup > 300:
            backup_checkpoints()
            last_backup = time.time()
    
    process.wait()

except KeyboardInterrupt:
    print("\n‚ö†Ô∏è Training interrupted!")
    process.terminate()

finally:
    backup_checkpoints()
    print("\n" + "="*60)
    print("‚úÖ Checkpoints saved to Drive!")
    print(f"üìÅ Location: {CHECKPOINT_DIR}")

## 7Ô∏è‚É£ Alternative: Simple Training (if Cell 6 has issues)

In [None]:
# Run this if Cell 6 has issues
!python colab_train.py

# Backup checkpoints
import shutil
import os

CHECKPOINT_DIR = '/content/drive/MyDrive/RDFNet_training_checkpoints'
os.makedirs(CHECKPOINT_DIR, exist_ok=True)

for f in os.listdir('logs'):
    if f.endswith('.pth'):
        shutil.copy2(f'logs/{f}', f'{CHECKPOINT_DIR}/{f}')
        print(f"üíæ Backed up: {f}")

print("\n‚úÖ All checkpoints backed up!")

---
# üìä Evaluation on RTTS
---

## 8Ô∏è‚É£ Find Best Checkpoint

In [None]:
import os
import shutil

CHECKPOINT_DIR = '/content/drive/MyDrive/RDFNet_training_checkpoints'
os.makedirs('logs', exist_ok=True)

# Find best checkpoint
best_checkpoint = None

for loc in ['logs', CHECKPOINT_DIR]:
    if os.path.exists(loc):
        files = [f for f in os.listdir(loc) if f.endswith('.pth')]
        
        # Prefer 'best'
        best_files = [f for f in files if 'best' in f.lower()]
        if best_files:
            best_checkpoint = os.path.join(loc, best_files[0])
            break
        
        # Otherwise latest epoch
        epoch_files = [f for f in files if f.startswith('ep')]
        if epoch_files:
            epoch_files.sort(key=lambda x: int(x.split('-')[0][2:]), reverse=True)
            best_checkpoint = os.path.join(loc, epoch_files[0])
            break

if best_checkpoint:
    print(f"üìå Using checkpoint: {best_checkpoint}")
    
    # Copy to local if from Drive
    if CHECKPOINT_DIR in best_checkpoint:
        local_path = f"logs/{os.path.basename(best_checkpoint)}"
        shutil.copy2(best_checkpoint, local_path)
        best_checkpoint = local_path
        print(f"üìã Copied to: {local_path}")
else:
    print("‚ùå No checkpoint found!")
    print("   Using pretrained: model_data/RDFNet.pth")
    best_checkpoint = 'model_data/RDFNet.pth'

## 9Ô∏è‚É£ Evaluate on RTTS

In [None]:
import os
import xml.etree.ElementTree as ET
from PIL import Image
from tqdm import tqdm

from yolo import YOLO
from utils.utils_map import get_map

# Paths
TEST_IMG = '/content/drive/MyDrive/dataset/RTTS/VOC2007/JPEGImages'
TEST_ANN = '/content/drive/MyDrive/dataset/RTTS/VOC2007/Annotations'
RTTS_CLASSES = ['bicycle', 'bus', 'car', 'motorbike', 'person']

# Find checkpoint (from previous cell)
model_path = best_checkpoint if 'best_checkpoint' in dir() else 'model_data/RDFNet.pth'
print(f"üì¶ Using: {model_path}")

# Initialize YOLO
print("\nüîÑ Loading model...")
yolo = YOLO(
    model_path=model_path,
    classes_path='model_data/rtts_classes.txt',
    anchors_path='model_data/yolo_anchors.txt',
    input_shape=[640, 640],
    phi='l',
    confidence=0.5,
    nms_iou=0.3,
    cuda=True
)

# Create output directories
os.makedirs('map_out/ground-truth', exist_ok=True)
os.makedirs('map_out/detection-results', exist_ok=True)

# Process images
xml_files = [f for f in os.listdir(TEST_ANN) if f.endswith('.xml')]
print(f"\nüìä Processing {len(xml_files)} images...")

for xml_file in tqdm(xml_files):
    img_id = xml_file[:-4]
    
    # Find image
    img_path = None
    for ext in ['.jpg', '.png', '.jpeg', '.JPG']:
        p = os.path.join(TEST_IMG, img_id + ext)
        if os.path.exists(p):
            img_path = p
            break
    
    if not img_path:
        continue
    
    # Ground truth
    tree = ET.parse(os.path.join(TEST_ANN, xml_file))
    root = tree.getroot()
    
    with open(f'map_out/ground-truth/{img_id}.txt', 'w') as f:
        for obj in root.iter('object'):
            name = obj.find('name').text.lower()
            if name not in RTTS_CLASSES:
                continue
            bbox = obj.find('bndbox')
            xmin = int(float(bbox.find('xmin').text))
            ymin = int(float(bbox.find('ymin').text))
            xmax = int(float(bbox.find('xmax').text))
            ymax = int(float(bbox.find('ymax').text))
            f.write(f"{name} {xmin} {ymin} {xmax} {ymax}\n")
    
    # Detection
    try:
        image = Image.open(img_path)
        yolo.get_map_txt(img_id, image, RTTS_CLASSES, 'map_out')
    except Exception as e:
        print(f"Error: {img_id} - {e}")

# Calculate mAP
print("\n" + "="*60)
print("üìä Calculating mAP...")
print("="*60)
get_map(0.5, True, path='map_out')

## üîü Save Results to Drive

In [None]:
import shutil
import os

RESULTS_DIR = '/content/drive/MyDrive/RDFNet_results'
os.makedirs(RESULTS_DIR, exist_ok=True)

# Copy mAP results
if os.path.exists('map_out'):
    shutil.copytree('map_out', f'{RESULTS_DIR}/map_out', dirs_exist_ok=True)
    print("‚úÖ Copied mAP results")

# Copy best checkpoint
for loc in ['logs', '/content/drive/MyDrive/RDFNet_training_checkpoints']:
    if os.path.exists(loc):
        for f in os.listdir(loc):
            if ('best' in f.lower() or 'last' in f.lower()) and f.endswith('.pth'):
                shutil.copy2(os.path.join(loc, f), f'{RESULTS_DIR}/{f}')
                print(f"‚úÖ Copied: {f}")

print(f"\nüìÅ Results saved to: {RESULTS_DIR}")