# 🔥 Micro-Job Feature Extraction Pipeline

**Mission**: Eliminate training bottlenecks with resumable feature caching  
**Target**: 4GB VRAM, 64 images per job, <2min per job  
**Strategy**: EfficientNet-B0 encoder → float16 NPZ cache → head-only training

---

## 🎯 Pipeline Overview

1. **Job Queue Creation**: Split dataset into 64-image chunks
2. **Feature Extraction**: Process jobs with encoder (batch_size=8)
3. **Feature Caching**: Save as `features/encoder_*/img_*.npz` (float16)
4. **Manifest Generation**: Create `features/manifest_features.v001.csv`
5. **Resume Logic**: Skip completed jobs via `.done` files

### 📊 Resource Targets
- **VRAM**: <2.5GB peak (within 4GB constraint)
- **Speed**: 64 images in <2 minutes
- **Storage**: ~50MB per 1000 images (float16 compression)
- **Quality**: Equivalent to full training pipeline

In [None]:
# 🔧 Setup & Imports
import os
import sys
from pathlib import Path
import numpy as np
import pandas as pd
import time
import json
import shutil
from datetime import datetime
from typing import List, Dict, Tuple, Optional
import warnings
warnings.filterwarnings('ignore')

# ML Libraries
import torch
import torch.nn as nn
import timm
from torch.utils.data import DataLoader, Dataset
import torchvision.transforms as transforms
from PIL import Image
from tqdm.notebook import tqdm

# Project imports
sys.path.append('../src')
from data_utils import ImageFolderAlb

# 🎮 Device & Memory Setup
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
if torch.cuda.is_available():
    torch.cuda.empty_cache()
    print(f"🚀 GPU: {torch.cuda.get_device_name(0)}")
    print(f"💾 VRAM: {torch.cuda.get_device_properties(0).total_memory / 1e9:.1f}GB")
else:
    print("⚠️ Running on CPU - feature extraction will be slower")

print(f"🔧 PyTorch: {torch.__version__}")
print(f"📁 Working dir: {Path.cwd()}")

In [None]:
# ⚙️ Configuration
CONFIG = {
    # Paths
    'data_dir': '../data',
    'features_dir': '../features',
    'encoder_name': 'efficientnet_b0',
    
    # Job settings (4GB VRAM optimized)
    'job_size': 64,          # Images per job
    'batch_size': 8,         # Processing batch (VRAM constraint)
    'img_size': 224,         # Input resolution
    'feature_dtype': 'float16',  # Memory compression
    
    # Feature extraction
    'use_global_pool': True,     # Extract global features
    'extract_spatial': False,    # Skip spatial for now (head-only training)
    'normalize_features': True,  # L2 normalize
    
    # Performance
    'num_workers': 4,        # DataLoader workers
    'pin_memory': True,      # GPU transfer optimization
    'prefetch_factor': 2,    # Async data loading
}

print("🎯 MICRO-JOB CONFIGURATION:")
print(f"   📊 Job size: {CONFIG['job_size']} images")
print(f"   🎬 Batch size: {CONFIG['batch_size']} (VRAM-safe)")
print(f"   📐 Image size: {CONFIG['img_size']}px")
print(f"   🗜️ Feature dtype: {CONFIG['feature_dtype']}")
print(f"   🏗️ Encoder: {CONFIG['encoder_name']}")

In [None]:
# 📊 Dataset Scanning & Job Queue Creation

def scan_dataset(data_dir: str) -> pd.DataFrame:
    """Scan dataset and create image manifest"""
    print(f"🔍 Scanning dataset: {data_dir}")
    
    data_path = Path(data_dir)
    if not data_path.exists():
        raise FileNotFoundError(f"Data directory not found: {data_dir}")
    
    # Collect all images
    images = []
    for class_dir in data_path.iterdir():
        if not class_dir.is_dir():
            continue
            
        class_name = class_dir.name
        print(f"   📁 Processing class: {class_name}")
        
        for img_file in class_dir.glob('*'):
            if img_file.suffix.lower() in ['.jpg', '.jpeg', '.png', '.bmp']:
                images.append({
                    'image_path': str(img_file),
                    'class_name': class_name,
                    'image_id': f"{class_name}_{img_file.stem}",
                    'file_size': img_file.stat().st_size
                })
    
    df = pd.DataFrame(images)
    print(f"\n✅ Dataset scan complete:")
    print(f"   🖼️ Total images: {len(df):,}")
    print(f"   🏷️ Classes: {df['class_name'].nunique()}")
    print(f"   💾 Total size: {df['file_size'].sum() / 1e9:.2f}GB")
    
    return df

def create_job_queue(image_df: pd.DataFrame, job_size: int = 64) -> pd.DataFrame:
    """Split images into job chunks for micro-job processing"""
    print(f"\n📋 Creating job queue (job_size={job_size})...")
    
    # Shuffle for balanced jobs across classes
    shuffled_df = image_df.sample(frac=1, random_state=42).reset_index(drop=True)
    
    # Create job chunks
    jobs = []
    for i in range(0, len(shuffled_df), job_size):
        job_images = shuffled_df.iloc[i:i+job_size]
        
        jobs.append({
            'job_id': len(jobs),
            'image_paths': ','.join(job_images['image_path'].tolist()),
            'image_ids': ','.join(job_images['image_id'].tolist()),
            'num_images': len(job_images),
            'classes': ','.join(job_images['class_name'].unique()),
            'status': 'pending',
            'created_at': datetime.now().isoformat()
        })
    
    job_df = pd.DataFrame(jobs)
    print(f"✅ Job queue created: {len(job_df)} jobs")
    print(f"   📊 Average job size: {job_df['num_images'].mean():.1f} images")
    print(f"   🎯 Estimated time: {len(job_df) * 2:.0f} minutes (2min/job)")
    
    return job_df

# Execute dataset scanning
image_manifest = scan_dataset(CONFIG['data_dir'])
job_queue = create_job_queue(image_manifest, CONFIG['job_size'])

In [None]:
# 🏗️ Feature Extraction Setup

class FeatureExtractor(nn.Module):
    """Lightweight feature extractor with global pooling"""
    
    def __init__(self, encoder_name: str = 'efficientnet_b0', pretrained: bool = True):
        super().__init__()
        self.encoder_name = encoder_name
        
        # Load pretrained encoder
        self.backbone = timm.create_model(
            encoder_name, 
            pretrained=pretrained,
            num_classes=0,  # Remove classifier head
            global_pool='avg'  # Global average pooling
        )
        
        # Get feature dimensions
        with torch.no_grad():
            dummy_input = torch.randn(1, 3, 224, 224)
            dummy_output = self.backbone(dummy_input)
            self.feature_dim = dummy_output.shape[1]
        
        print(f"🏗️ Feature extractor: {encoder_name}")
        print(f"   📐 Feature dim: {self.feature_dim}")
        print(f"   💾 Parameters: {sum(p.numel() for p in self.parameters()):,}")
        
    def forward(self, x):
        """Extract global features"""
        features = self.backbone(x)  # [B, feature_dim]
        return features

class ImageDataset(Dataset):
    """Simple dataset for feature extraction"""
    
    def __init__(self, image_paths: List[str], transform=None):
        self.image_paths = image_paths
        self.transform = transform
    
    def __len__(self):
        return len(self.image_paths)
    
    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        
        # Load image
        try:
            image = Image.open(img_path).convert('RGB')
        except Exception as e:
            print(f"⚠️ Error loading {img_path}: {e}")
            # Return black image as fallback
            image = Image.new('RGB', (224, 224), (0, 0, 0))
        
        if self.transform:
            image = self.transform(image)
        
        return image, img_path

# Initialize feature extractor
feature_extractor = FeatureExtractor(CONFIG['encoder_name']).to(device)
feature_extractor.eval()

# Define transforms (minimal - just resize & normalize)
transform = transforms.Compose([
    transforms.Resize((CONFIG['img_size'], CONFIG['img_size'])),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

print(f"✅ Feature extraction setup complete")
print(f"   🎯 Ready for {CONFIG['job_size']}-image micro-jobs")

In [None]:
# 🔥 Core Job Execution Function

def run_feature_job(job_id: int, job_queue: pd.DataFrame, force_rerun: bool = False) -> bool:
    """Execute single feature extraction job"""
    
    if job_id >= len(job_queue):
        print(f"❌ Job ID {job_id} out of range (max: {len(job_queue)-1})")
        return False
    
    job = job_queue.iloc[job_id]
    
    # Create output directories
    features_dir = Path(CONFIG['features_dir'])
    encoder_dir = features_dir / f"encoder_{CONFIG['encoder_name']}"
    encoder_dir.mkdir(parents=True, exist_ok=True)
    
    # Check if job already completed
    done_file = features_dir / f"_job_{job_id:04d}_{int(time.time())}.done"
    existing_done = list(features_dir.glob(f"_job_{job_id:04d}_*.done"))
    
    if existing_done and not force_rerun:
        print(f"✅ Job {job_id} already completed: {existing_done[0].name}")
        return True
    
    print(f"\n🚀 Starting job {job_id}/{len(job_queue)-1}")
    print(f"   📊 Images: {job['num_images']}")
    print(f"   🏷️ Classes: {job['classes']}")
    
    start_time = time.time()
    
    try:
        # Parse image paths
        image_paths = job['image_paths'].split(',')
        image_ids = job['image_ids'].split(',')
        
        # Create dataset and dataloader
        dataset = ImageDataset(image_paths, transform=transform)
        dataloader = DataLoader(
            dataset, 
            batch_size=CONFIG['batch_size'],
            shuffle=False,
            num_workers=CONFIG['num_workers'],
            pin_memory=CONFIG['pin_memory'],
            prefetch_factor=CONFIG['prefetch_factor']
        )
        
        # Extract features
        all_features = []
        all_paths = []
        
        with torch.no_grad():
            for batch_images, batch_paths in tqdm(dataloader, 
                                                 desc=f"Job {job_id}", 
                                                 leave=False):
                batch_images = batch_images.to(device, non_blocking=True)
                
                # Extract features
                features = feature_extractor(batch_images)  # [B, feature_dim]
                
                # Normalize if requested
                if CONFIG['normalize_features']:
                    features = torch.nn.functional.normalize(features, p=2, dim=1)
                
                # Convert to numpy and compress to float16
                features_np = features.cpu().numpy().astype(CONFIG['feature_dtype'])
                
                all_features.append(features_np)
                all_paths.extend(batch_paths)
        
        # Concatenate all features
        all_features = np.concatenate(all_features, axis=0)
        
        print(f"   ✅ Extracted: {all_features.shape} features")
        
        # Save features individually
        saved_count = 0
        for i, (img_path, img_id) in enumerate(zip(all_paths, image_ids)):
            feature_file = encoder_dir / f"{img_id}.npz"
            
            np.savez_compressed(
                feature_file,
                features=all_features[i],
                image_path=img_path,
                image_id=img_id,
                encoder_name=CONFIG['encoder_name'],
                extraction_time=datetime.now().isoformat()
            )
            saved_count += 1
        
        # Create completion marker
        job_metadata = {
            'job_id': job_id,
            'num_images': len(image_paths),
            'feature_shape': list(all_features.shape),
            'processing_time': time.time() - start_time,
            'encoder_name': CONFIG['encoder_name'],
            'config': CONFIG,
            'completed_at': datetime.now().isoformat()
        }
        
        with open(done_file, 'w') as f:
            json.dump(job_metadata, f, indent=2)
        
        elapsed = time.time() - start_time
        print(f"✅ Job {job_id} completed: {saved_count} features saved in {elapsed:.1f}s")
        print(f"   💾 Output: {encoder_dir}/")
        print(f"   🏁 Done marker: {done_file.name}")
        
        return True
        
    except Exception as e:
        print(f"❌ Job {job_id} failed: {e}")
        import traceback
        traceback.print_exc()
        return False

print("🔥 Job execution function ready")
print("   Usage: run_feature_job(job_id, job_queue)")
print("   Target: <2 minutes per 64-image job")

In [None]:
# 🧪 TEST: Single Job Execution
# Run this cell to test the pipeline with job 0

TEST_JOB_ID = 0

print(f"🧪 Testing job execution with job {TEST_JOB_ID}")
print(f"Expected: {job_queue.iloc[TEST_JOB_ID]['num_images']} images processed")

# Clear GPU memory before test
if torch.cuda.is_available():
    torch.cuda.empty_cache()
    print(f"🧹 GPU memory cleared")

# Run test job
success = run_feature_job(TEST_JOB_ID, job_queue, force_rerun=True)

if success:
    # Verify outputs
    encoder_dir = Path(CONFIG['features_dir']) / f"encoder_{CONFIG['encoder_name']}"
    feature_files = list(encoder_dir.glob('*.npz'))
    done_files = list(Path(CONFIG['features_dir']).glob(f'_job_{TEST_JOB_ID:04d}_*.done'))
    
    print(f"\n✅ TEST RESULTS:")
    print(f"   📁 Feature files created: {len(feature_files)}")
    print(f"   🏁 Done files created: {len(done_files)}")
    
    # Test loading a feature file
    if feature_files:
        test_feature = np.load(feature_files[0])
        print(f"   🧪 Sample feature shape: {test_feature['features'].shape}")
        print(f"   🗜️ Feature dtype: {test_feature['features'].dtype}")
        
        # Check memory usage
        feature_size = test_feature['features'].nbytes
        total_estimated = feature_size * len(image_manifest) / 1e6
        print(f"   💾 Per-feature size: {feature_size} bytes")
        print(f"   📊 Estimated total: {total_estimated:.1f}MB for full dataset")
        
    print(f"\n🎯 Test job completed successfully!")
else:
    print(f"❌ Test job failed - check error messages above")

In [None]:
# 🏭 Batch Job Execution (Full Pipeline)
# WARNING: This will process ALL jobs - use for full feature extraction

def run_all_jobs(job_queue: pd.DataFrame, max_jobs: int = None, 
                 start_job: int = 0) -> Dict:
    """Execute all feature extraction jobs with progress tracking"""
    
    total_jobs = len(job_queue)
    if max_jobs:
        total_jobs = min(total_jobs, max_jobs)
    
    print(f"🏭 BATCH JOB EXECUTION")
    print(f"   📊 Total jobs: {total_jobs}")
    print(f"   🎯 Estimated time: {total_jobs * 2:.0f} minutes")
    print(f"   💾 Estimated storage: {total_jobs * CONFIG['job_size'] * 0.05:.1f}MB")
    
    results = {
        'completed_jobs': [],
        'failed_jobs': [],
        'total_time': 0,
        'total_features': 0
    }
    
    start_time = time.time()
    
    for job_id in tqdm(range(start_job, min(start_job + total_jobs, len(job_queue))), 
                       desc="Processing jobs"):
        
        job_start = time.time()
        success = run_feature_job(job_id, job_queue)
        job_time = time.time() - job_start
        
        if success:
            results['completed_jobs'].append({
                'job_id': job_id,
                'time': job_time,
                'images': job_queue.iloc[job_id]['num_images']
            })
            results['total_features'] += job_queue.iloc[job_id]['num_images']
        else:
            results['failed_jobs'].append(job_id)
        
        # Clear GPU memory periodically
        if job_id % 10 == 0 and torch.cuda.is_available():
            torch.cuda.empty_cache()
    
    results['total_time'] = time.time() - start_time
    
    print(f"\n🏁 BATCH EXECUTION COMPLETE")
    print(f"   ✅ Completed: {len(results['completed_jobs'])}/{total_jobs} jobs")
    print(f"   ❌ Failed: {len(results['failed_jobs'])} jobs")
    print(f"   ⏱️ Total time: {results['total_time']/60:.1f} minutes")
    print(f"   🖼️ Total features: {results['total_features']:,}")
    
    if results['completed_jobs']:
        avg_time = np.mean([j['time'] for j in results['completed_jobs']])
        print(f"   📊 Average job time: {avg_time:.1f}s")
    
    return results

# COMMENTED OUT - UNCOMMENT TO RUN FULL EXTRACTION
# This will process all jobs and may take hours!

# results = run_all_jobs(job_queue, max_jobs=5)  # Test with 5 jobs first

print("⚠️ Batch execution commented out for safety")
print("Uncomment and modify max_jobs parameter to run full extraction")
print(f"Total jobs available: {len(job_queue)}")

In [None]:
# 📊 Feature Manifest Generation

def create_feature_manifest(features_dir: str) -> pd.DataFrame:
    """Create comprehensive manifest of extracted features"""
    
    print(f"📊 Creating feature manifest from {features_dir}")
    
    features_path = Path(features_dir)
    encoder_dir = features_path / f"encoder_{CONFIG['encoder_name']}"
    
    if not encoder_dir.exists():
        print(f"⚠️ Encoder directory not found: {encoder_dir}")
        return pd.DataFrame()
    
    # Collect all feature files
    feature_files = list(encoder_dir.glob('*.npz'))
    print(f"   📁 Found {len(feature_files)} feature files")
    
    if not feature_files:
        print("   ⚠️ No feature files found")
        return pd.DataFrame()
    
    manifest_data = []
    
    for feature_file in tqdm(feature_files, desc="Building manifest"):
        try:
            # Load metadata without loading full features
            with np.load(feature_file) as data:
                manifest_data.append({
                    'image_id': str(data['image_id']),
                    'image_path': str(data['image_path']),
                    'feature_file': str(feature_file),
                    'encoder_name': str(data['encoder_name']),
                    'feature_shape': data['features'].shape,
                    'feature_dtype': str(data['features'].dtype),
                    'file_size': feature_file.stat().st_size,
                    'extraction_time': str(data['extraction_time']) if 'extraction_time' in data else None,
                    'class_name': Path(data['image_path']).parent.name
                })
        except Exception as e:
            print(f"   ⚠️ Error reading {feature_file}: {e}")
    
    if not manifest_data:
        print("   ❌ No valid feature files found")
        return pd.DataFrame()
    
    manifest_df = pd.DataFrame(manifest_data)
    
    # Add summary statistics
    print(f"\n✅ Feature manifest created:")
    print(f"   📊 Total features: {len(manifest_df):,}")
    print(f"   🏷️ Classes: {manifest_df['class_name'].nunique()}")
    print(f"   🗜️ Feature dtype: {manifest_df['feature_dtype'].iloc[0]}")
    print(f"   📐 Feature shape: {manifest_df['feature_shape'].iloc[0]}")
    print(f"   💾 Total size: {manifest_df['file_size'].sum() / 1e6:.1f}MB")
    
    # Class distribution
    class_counts = manifest_df['class_name'].value_counts()
    print(f"\n📋 Class distribution (top 10):")
    for class_name, count in class_counts.head(10).items():
        print(f"   {class_name}: {count} features")
    
    return manifest_df

def save_manifest(manifest_df: pd.DataFrame, features_dir: str) -> str:
    """Save feature manifest to CSV"""
    manifest_file = Path(features_dir) / 'manifest_features.v001.csv'
    manifest_df.to_csv(manifest_file, index=False)
    
    print(f"💾 Manifest saved: {manifest_file}")
    return str(manifest_file)

# Generate manifest if features exist
encoder_dir = Path(CONFIG['features_dir']) / f"encoder_{CONFIG['encoder_name']}"
if encoder_dir.exists():
    manifest = create_feature_manifest(CONFIG['features_dir'])
    if not manifest.empty:
        manifest_file = save_manifest(manifest, CONFIG['features_dir'])
        print(f"✅ Feature pipeline ready for head-only training!")
    else:
        print("⚠️ No features found - run feature extraction jobs first")
else:
    print(f"📋 Manifest will be created after feature extraction")
    print(f"Expected location: {CONFIG['features_dir']}/manifest_features.v001.csv")