## What changed?

This notebook takes the output result of `250813_nda_all` and attempts to optimize a single model instead of an ensemble.

In [1]:
# Check if CUDA

import torch
print(f"PyTorch version: {torch.__version__}")
print(f"CUDA available: {torch.cuda.is_available()}")
print(f"CUDA version: {torch.version.cuda}")
print(f"GPU count: {torch.cuda.device_count()}")

if torch.cuda.is_available():
    print(f"GPU name: {torch.cuda.get_device_name(0)}")
    print(f"GPU memory: {torch.cuda.get_device_properties(0).total_memory / 1024**3:.1f} GB")
else:
    print("❌ CUDA not detected by PyTorch")

PyTorch version: 2.5.1+cu121
CUDA available: True
CUDA version: 12.1
GPU count: 1
GPU name: NVIDIA GeForce RTX 2060
GPU memory: 6.0 GB


### Fine-tuning hyperparameters of ghostnet_100

- Trying to recapture  ✓ Val: 84.2%, Test: 81.2%

In [None]:
import time
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import timm
import numpy as np
import cv2
import random
import json
import os
import glob
import itertools
from datetime import datetime
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from collections import Counter
import warnings
warnings.filterwarnings('ignore')

# GhostNet Hyperparameter Search Space
HYPERPARAMETER_GRID = {
    'backbone_lr': [0.0001, 0.0003, 0.0005],
    'classifier_lr': [0.0005, 0.001, 0.002],
    'weight_decay': [0.01, 0.03, 0.05],
    'batch_size': [8, 12, 16],
    'optimizer': ['adamw', 'sgd'],
    'scheduler': ['cosine', 'plateau'],
    'dropout': [0.2, 0.3, 0.4, 0.5],
    'label_smoothing': [0.05, 0.1, 0.15],
    'augmentation_strength': ['light', 'medium', 'heavy'],
    'freeze_layers': [2, 3, 4]  # How many early block groups to freeze
}

IMAGE_SIZE = (224, 224)
AUGMENTATION_TARGET = 1000
MAX_COMBINATIONS = 30  # Test 30 combinations

def detect_and_convert_image(image):
    """Detect if image is grayscale and convert to 3-channel RGB"""
    if len(image.shape) == 2:
        return cv2.cvtColor(image, cv2.COLOR_GRAY2RGB)
    elif len(image.shape) == 3:
        if image.shape[2] == 1:
            return cv2.cvtColor(image, cv2.COLOR_GRAY2RGB)
        elif image.shape[2] == 3:
            return image
        elif image.shape[2] == 4:
            return cv2.cvtColor(image, cv2.COLOR_BGRA2RGB)
    return image

def load_combined_data():
    """Load data from both color and grayscale folders"""
    color_path = "G:\\Dropbox\\AI Projects\\buck\\images\\squared\\color\\*_NDA.png"
    gray_path = "G:\\Dropbox\\AI Projects\\buck\\images\\squared\\grayscale\\*_NDA.png"
    
    images = []
    ages = []
    sources = []
    
    print("Loading color images...")
    color_files = glob.glob(color_path)
    for img_path in color_files:
        try:
            img = cv2.imread(img_path)
            if img is None:
                continue
            
            img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
            img = detect_and_convert_image(img)
            img_resized = cv2.resize(img, IMAGE_SIZE[::-1])
            
            filename = os.path.basename(img_path)
            filename_no_ext = os.path.splitext(filename)[0]
            parts = filename_no_ext.split('_')
            
            if len(parts) < 5:
                continue
            
            age_part = parts[3]
            if 'xpx' in age_part.lower() or 'p' not in age_part:
                continue
            
            try:
                age_value = float(age_part.replace('p', '.'))
                images.append(img_resized)
                ages.append(age_value)
                sources.append('color')
            except ValueError:
                continue
                
        except Exception as e:
            continue
    
    print(f"Loaded {len([s for s in sources if s == 'color'])} color images")
    
    print("Loading grayscale images...")
    gray_files = glob.glob(gray_path)
    for img_path in gray_files:
        try:
            img = cv2.imread(img_path, cv2.IMREAD_UNCHANGED)
            if img is None:
                continue
            
            img = detect_and_convert_image(img)
            img_resized = cv2.resize(img, IMAGE_SIZE[::-1])
            
            filename = os.path.basename(img_path)
            filename_no_ext = os.path.splitext(filename)[0]
            parts = filename_no_ext.split('_')
            
            if len(parts) < 5:
                continue
            
            age_part = parts[3]
            if 'xpx' in age_part.lower() or 'p' not in age_part:
                continue
            
            try:
                age_value = float(age_part.replace('p', '.'))
                images.append(img_resized)
                ages.append(age_value)
                sources.append('grayscale')
            except ValueError:
                continue
                
        except Exception as e:
            continue
    
    print(f"Loaded {len([s for s in sources if s == 'grayscale'])} grayscale images")
    print(f"Total images: {len(images)}")
    
    # Group ages
    ages_grouped = [5.5 if age >= 5.5 else age for age in ages]
    
    # Filter classes with enough samples
    age_counts = Counter(ages_grouped)
    valid_ages = {age for age, count in age_counts.items() if count >= 3}
    
    filtered_images = []
    filtered_ages = []
    filtered_sources = []
    
    for img, age, source in zip(images, ages_grouped, sources):
        if age in valid_ages:
            filtered_images.append(img)
            filtered_ages.append(age)
            filtered_sources.append(source)
    
    print(f"Final dataset: {len(filtered_images)} images")
    print(f"Age distribution: {dict(Counter(filtered_ages))}")
    
    return np.array(filtered_images), filtered_ages, filtered_sources

def enhanced_augment_image(image, strength='medium'):
    """Enhanced augmentation with variable strength"""
    if image.dtype != np.uint8:
        image = image.astype(np.uint8)
    
    # Set probabilities based on strength
    if strength == 'light':
        rot_prob, flip_prob, bright_prob, gamma_prob, noise_prob = 0.5, 0.3, 0.6, 0.2, 0.1
        rot_range, bright_range = 10, (0.8, 1.2)
    elif strength == 'medium':
        rot_prob, flip_prob, bright_prob, gamma_prob, noise_prob = 0.7, 0.5, 0.8, 0.4, 0.3
        rot_range, bright_range = 15, (0.7, 1.3)
    else:  # heavy
        rot_prob, flip_prob, bright_prob, gamma_prob, noise_prob = 0.8, 0.6, 0.9, 0.5, 0.4
        rot_range, bright_range = 20, (0.6, 1.4)
    
    # Rotation
    if random.random() < rot_prob:
        angle = random.uniform(-rot_range, rot_range)
        h, w = image.shape[:2]
        M = cv2.getRotationMatrix2D((w//2, h//2), angle, 1.0)
        image = cv2.warpAffine(image, M, (w, h))
    
    # Horizontal flip
    if random.random() < flip_prob:
        image = cv2.flip(image, 1)
    
    # Strategic color conversion (RGB -> Grayscale -> RGB)
    if len(image.shape) == 3 and image.shape[2] == 3 and random.random() < 0.4:
        gray = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY)
        image = cv2.cvtColor(gray, cv2.COLOR_GRAY2RGB)
    
    # Brightness/contrast
    if random.random() < bright_prob:
        alpha = random.uniform(*bright_range)
        beta = random.randint(-25, 25)
        image = cv2.convertScaleAbs(image, alpha=alpha, beta=beta)
    
    # Gamma correction
    if random.random() < gamma_prob:
        gamma = random.uniform(0.8, 1.2)
        inv_gamma = 1.0 / gamma
        table = np.array([((i / 255.0) ** inv_gamma) * 255 for i in np.arange(0, 256)]).astype("uint8")
        image = cv2.LUT(image, table)
    
    # Noise
    if random.random() < noise_prob:
        noise = np.random.normal(0, 7, image.shape).astype(np.int16)
        image_int16 = image.astype(np.int16)
        noisy_image = np.clip(image_int16 + noise, 0, 255)
        image = noisy_image.astype(np.uint8)
    
    return image

def create_balanced_dataset(X, y, aug_strength='medium'):
    """Create balanced dataset through augmentation"""
    class_counts = Counter(y)
    max_count = max(class_counts.values())
    target_count = max(AUGMENTATION_TARGET, max_count)
    
    X_balanced = []
    y_balanced = []
    
    for class_idx in range(len(set(y))):
        class_mask = np.array(y) == class_idx
        class_images = X[class_mask]
        current_count = len(class_images)
        
        if current_count == 0:
            continue
        
        # Add originals
        X_balanced.extend(class_images)
        y_balanced.extend([class_idx] * current_count)
        
        # Add augmented to reach target
        needed = target_count - current_count
        for i in range(needed):
            orig_idx = random.randint(0, current_count - 1)
            aug_img = enhanced_augment_image(class_images[orig_idx].copy(), aug_strength)
            X_balanced.append(aug_img)
            y_balanced.append(class_idx)
    
    return np.array(X_balanced), np.array(y_balanced)

class DeerDataset(Dataset):
    def __init__(self, X, y, training=True):
        self.X = torch.FloatTensor(X if isinstance(X, np.ndarray) else np.array(X))
        self.y = torch.LongTensor(y if isinstance(y, np.ndarray) else np.array(y))
        self.training = training
        self.mean = torch.tensor([0.485, 0.456, 0.406]).view(3, 1, 1)
        self.std = torch.tensor([0.229, 0.224, 0.225]).view(3, 1, 1)
    
    def __len__(self):
        return len(self.X)
    
    def __getitem__(self, idx):
        image = self.X[idx].clone()
        label = self.y[idx].clone()
        
        if image.max() > 1.0:
            image = image / 255.0
        
        if len(image.shape) == 3 and image.shape[-1] == 3:
            image = image.permute(2, 0, 1)
        
        if not self.training and random.random() < 0.5:
            image = torch.flip(image, [2])
        
        image = (image - self.mean) / self.std
        return image, label

class GhostNetHyperparameterTuner:
    def __init__(self, num_classes, save_dir=None):
        self.num_classes = num_classes
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        
        if save_dir is None:
            timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
            self.save_dir = f"ghostnet_tuning_{timestamp}"
        else:
            self.save_dir = save_dir
        
        os.makedirs(self.save_dir, exist_ok=True)
        
        print(f"Using device: {self.device}")
        if torch.cuda.is_available():
            print(f"GPU: {torch.cuda.get_device_name()}")
            torch.backends.cudnn.benchmark = True
    
    def create_ghostnet_model(self, dropout=0.3, freeze_layers=3):
        """Create GhostNet model with specified dropout and freezing"""
        model = timm.create_model('ghostnet_100', pretrained=True, num_classes=self.num_classes)
        
        # Freeze early layers based on freeze_layers parameter
        freeze_patterns = [
            ['conv_stem'],
            ['conv_stem', 'blocks.0'],
            ['conv_stem', 'blocks.0', 'blocks.1'],
            ['conv_stem', 'blocks.0', 'blocks.1', 'blocks.2']
        ]
        
        if freeze_layers <= len(freeze_patterns):
            for name, param in model.named_parameters():
                for pattern in freeze_patterns[freeze_layers - 1]:
                    if pattern in name:
                        param.requires_grad = False
                        break
        
        # Replace classifier with custom dropout
        if hasattr(model, 'classifier'):
            in_features = model.classifier.in_features
            model.classifier = nn.Sequential(
                nn.Dropout(dropout),
                nn.Linear(in_features, self.num_classes)
            )
        
        return model.to(self.device)
    
    def get_optimizer(self, model, opt_type, backbone_lr, classifier_lr, weight_decay):
        """Create optimizer based on hyperparameters"""
        backbone_params = []
        classifier_params = []
        
        for name, param in model.named_parameters():
            if param.requires_grad:
                if 'classifier' in name:
                    classifier_params.append(param)
                else:
                    backbone_params.append(param)
        
        param_groups = [
            {'params': backbone_params, 'lr': backbone_lr},
            {'params': classifier_params, 'lr': classifier_lr}
        ]
        
        if opt_type == 'adamw':
            return optim.AdamW(param_groups, weight_decay=weight_decay)
        elif opt_type == 'sgd':
            return optim.SGD(param_groups, weight_decay=weight_decay, momentum=0.9)
        else:
            raise ValueError(f"Unknown optimizer: {opt_type}")
    
    def get_scheduler(self, optimizer, scheduler_type, max_epochs):
        """Create learning rate scheduler"""
        if scheduler_type == 'cosine':
            return optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=max_epochs, eta_min=1e-6)
        elif scheduler_type == 'plateau':
            return optim.lr_scheduler.ReduceLROnPlateau(optimizer, patience=10, factor=0.5, verbose=False)
        else:
            raise ValueError(f"Unknown scheduler: {scheduler_type}")
    
    def train_with_hyperparams(self, train_loader, val_loader, test_loader, hyperparams, combo_num):
        """Train model with specific hyperparameters"""
        model = self.create_ghostnet_model(
            dropout=hyperparams['dropout'], 
            freeze_layers=hyperparams['freeze_layers']
        )
        
        optimizer = self.get_optimizer(
            model, hyperparams['optimizer'], 
            hyperparams['backbone_lr'], hyperparams['classifier_lr'], 
            hyperparams['weight_decay']
        )
        
        scheduler = self.get_scheduler(optimizer, hyperparams['scheduler'], 80)
        criterion = nn.CrossEntropyLoss(label_smoothing=hyperparams['label_smoothing'])
        
        best_val_acc = 0.0
        patience = 20
        patience_counter = 0
        best_state = None
        
        for epoch in range(80):
            # Training
            model.train()
            train_correct = 0
            train_total = 0
            
            for images, labels in train_loader:
                images, labels = images.to(self.device), labels.to(self.device)
                optimizer.zero_grad()
                
                outputs = model(images)
                loss = criterion(outputs, labels)
                loss.backward()
                optimizer.step()
                
                _, predicted = torch.max(outputs, 1)
                train_total += labels.size(0)
                train_correct += (predicted == labels).sum().item()
            
            # Validation
            model.eval()
            val_correct = 0
            val_total = 0
            
            with torch.no_grad():
                for images, labels in val_loader:
                    images, labels = images.to(self.device), labels.to(self.device)
                    outputs = model(images)
                    
                    _, predicted = torch.max(outputs, 1)
                    val_total += labels.size(0)
                    val_correct += (predicted == labels).sum().item()
            
            train_acc = 100 * train_correct / train_total
            val_acc = 100 * val_correct / val_total
            
            if val_acc > best_val_acc:
                best_val_acc = val_acc
                patience_counter = 0
                best_state = model.state_dict().copy()
            else:
                patience_counter += 1
            
            # Update scheduler
            if hyperparams['scheduler'] == 'plateau':
                scheduler.step(val_acc)
            else:
                scheduler.step()
            
            # Print progress every 20 epochs
            if epoch % 20 == 0 and epoch > 0:
                print(f"    Epoch {epoch}: Train {train_acc:.1f}%, Val {val_acc:.1f}%")
            
            if patience_counter >= patience:
                print(f"    Early stopping at epoch {epoch}")
                break
            
            if epoch % 5 == 0:
                torch.cuda.empty_cache()
        
        # Load best weights and evaluate on test
        if best_state is not None:
            model.load_state_dict(best_state)
        
        # Test evaluation with TTA
        model.eval()
        test_correct = 0
        test_total = 0
        
        with torch.no_grad():
            for images, labels in test_loader:
                images, labels = images.to(self.device), labels.to(self.device)
                
                # Test time augmentation
                outputs1 = model(images)
                flipped = torch.flip(images, [3])
                outputs2 = model(flipped)
                outputs = (outputs1 + outputs2) / 2
                
                _, predicted = torch.max(outputs, 1)
                test_total += labels.size(0)
                test_correct += (predicted == labels).sum().item()
        
        test_acc = 100 * test_correct / test_total
        return model, best_val_acc, test_acc
    
    def generate_hyperparameter_combinations(self):
        """Generate smart hyperparameter combinations"""
        # Create all possible combinations
        keys = list(HYPERPARAMETER_GRID.keys())
        values = list(HYPERPARAMETER_GRID.values())
        all_combinations = list(itertools.product(*values))
        
        # Shuffle and limit
        random.shuffle(all_combinations)
        selected_combinations = all_combinations[:MAX_COMBINATIONS]
        
        # Convert to list of dictionaries
        combinations = []
        for combo in selected_combinations:
            hyperparams = dict(zip(keys, combo))
            combinations.append(hyperparams)
        
        return combinations
    
    def tune_hyperparameters(self, X_train, y_train, X_val, y_val, X_test, y_test):
        """Main hyperparameter tuning loop"""
        print(f"Starting GhostNet hyperparameter tuning...")
        print(f"Testing {MAX_COMBINATIONS} hyperparameter combinations")
        
        combinations = self.generate_hyperparameter_combinations()
        results = []
        best_val_acc = 0.0
        
        for i, hyperparams in enumerate(combinations, 1):
            print(f"\n[{i:2d}/{MAX_COMBINATIONS}] Testing combination {i}")
            print(f"  Optimizer: {hyperparams['optimizer']}, LR: {hyperparams['backbone_lr']}/{hyperparams['classifier_lr']}")
            print(f"  Batch: {hyperparams['batch_size']}, Dropout: {hyperparams['dropout']}, Freeze: {hyperparams['freeze_layers']}")
            
            try:
                # Create datasets with current augmentation strength
                X_train_aug, y_train_aug = create_balanced_dataset(
                    X_train, y_train, hyperparams['augmentation_strength']
                )
                
                train_dataset = DeerDataset(X_train_aug, y_train_aug, training=True)
                val_dataset = DeerDataset(X_val, y_val, training=False)
                test_dataset = DeerDataset(X_test, y_test, training=False)
                
                train_loader = DataLoader(train_dataset, batch_size=hyperparams['batch_size'], shuffle=True, num_workers=0)
                val_loader = DataLoader(val_dataset, batch_size=hyperparams['batch_size'], shuffle=False, num_workers=0)
                test_loader = DataLoader(test_dataset, batch_size=hyperparams['batch_size'], shuffle=False, num_workers=0)
                
                model, val_acc, test_acc = self.train_with_hyperparams(
                    train_loader, val_loader, test_loader, hyperparams, i
                )
                
                result = {
                    'combination': i,
                    'hyperparams': hyperparams,
                    'val_accuracy': val_acc,
                    'test_accuracy': test_acc
                }
                results.append(result)
                
                print(f"  ✓ Val: {val_acc:.1f}%, Test: {test_acc:.1f}%")
                
                # Save every model with unique name based on test accuracy
                model_path = os.path.join(self.save_dir, f"ghostnet_combo_{i:02d}_test_{test_acc:.1f}.pth")
                torch.save({
                    'model_state_dict': model.state_dict(),
                    'hyperparams': hyperparams,
                    'val_accuracy': val_acc,
                    'test_accuracy': test_acc,
                    'combination': i
                }, model_path)
                print(f"  💾 Model saved: ghostnet_combo_{i:02d}_test_{test_acc:.1f}.pth")
                
                # Save best model so far
                if val_acc > best_val_acc:
                    best_val_acc = val_acc
                    best_path = os.path.join(self.save_dir, f"ghostnet_best_val_{val_acc:.1f}.pth")
                    torch.save({
                        'model_state_dict': model.state_dict(),
                        'hyperparams': hyperparams,
                        'val_accuracy': val_acc,
                        'test_accuracy': test_acc,
                        'combination': i
                    }, best_path)
                    print(f"  💾 New best model saved: {val_acc:.1f}%")
                
                torch.cuda.empty_cache()
                
            except Exception as e:
                print(f"  ✗ Combination {i} failed: {str(e)[:60]}...")
                torch.cuda.empty_cache()
                continue
        
        # Save all results
        results_path = os.path.join(self.save_dir, "hyperparameter_results.json")
        with open(results_path, 'w') as f:
            json.dump(results, f, indent=2)
        
        return results

def evaluate_model(model, test_loader, device):
    """Evaluate model on test set with TTA"""
    model.eval()
    test_correct = 0
    test_total = 0
    
    with torch.no_grad():
        for images, labels in test_loader:
            images, labels = images.to(device), labels.to(device)
            
            # Test time augmentation
            outputs1 = model(images)
            flipped = torch.flip(images, [3])
            outputs2 = model(flipped)
            outputs = (outputs1 + outputs2) / 2
            
            _, predicted = torch.max(outputs, 1)
            test_total += labels.size(0)
            test_correct += (predicted == labels).sum().item()
    
    test_acc = 100 * test_correct / test_total
    return test_acc

def main():
    print("GhostNet Hyperparameter Tuning for Deer Age Prediction")
    print("=" * 60)
    
    start_time = time.time()
    
    # Load combined data
    images, ages, sources = load_combined_data()
    
    # Create label mapping
    unique_ages = sorted(list(set(ages)))
    label_mapping = {age: i for i, age in enumerate(unique_ages)}
    y_indices = np.array([label_mapping[age] for age in ages])
    
    print(f"\nClasses: {len(unique_ages)}")
    print(f"Label mapping: {label_mapping}")
    
    # Train/test split
    X_train, X_test, y_train, y_test = train_test_split(
        images, y_indices, test_size=0.2, random_state=42, stratify=y_indices
    )
    
    # Further split training into train/val
    X_train_final, X_val, y_train_final, y_val = train_test_split(
        X_train, y_train, test_size=0.2, random_state=42, stratify=y_train
    )
    
    print(f"\nData split:")
    print(f"Train: {len(X_train_final)} images")
    print(f"Val: {len(X_val)} images") 
    print(f"Test: {len(X_test)} images")
    
    # Initialize tuner
    tuner = GhostNetHyperparameterTuner(num_classes=len(unique_ages))
    
    # Run hyperparameter tuning
    results = tuner.tune_hyperparameters(X_train_final, y_train_final, X_val, y_val, X_test, y_test)
    
    # Analyze results
    if results:
        # Sort by validation accuracy
        sorted_results = sorted(results, key=lambda x: x['val_accuracy'], reverse=True)
        
        elapsed = (time.time() - start_time) / 60
        
        print("\n" + "=" * 60)
        print("GHOSTNET HYPERPARAMETER TUNING RESULTS")
        print("=" * 60)
        print(f"{'Rank':<4} {'Combination':<6} {'Validation':<12} {'Test':<8} {'Key Hyperparams'}")
        print('-' * 75)
        
        for i, result in enumerate(sorted_results[:10], 1):  # Top 10
            hp = result['hyperparams']
            key_params = f"opt={hp['optimizer']}, lr={hp['backbone_lr']}, bs={hp['batch_size']}, drop={hp['dropout']}"
            print(f"{i:2d}. {result['combination']:4d}       {result['val_accuracy']:5.1f}%      {result['test_accuracy']:5.1f}%    {key_params}")
        
        best_result = sorted_results[0]
        print(f"\n🏆 BEST HYPERPARAMETERS:")
        for key, value in best_result['hyperparams'].items():
            print(f"  {key}: {value}")
        
        print(f"\n📊 PERFORMANCE:")
        print(f"  Best Validation: {best_result['val_accuracy']:.1f}%")
        print(f"  Best Test: {best_result['test_accuracy']:.1f}%")
        print(f"  Tuning Time: {elapsed:.1f} minutes")
        print(f"  Results saved to: {tuner.save_dir}")
        
    else:
        print("No successful combinations found!")

if __name__ == "__main__":
    main()

GhostNet Hyperparameter Tuning for Deer Age Prediction
Loading color images...
Loaded 200 color images
Loading grayscale images...
Loaded 38 grayscale images
Total images: 238
Final dataset: 238 images
Age distribution: {2.5: 40, 3.5: 50, 4.5: 56, 5.5: 60, 1.5: 32}

Classes: 5
Label mapping: {1.5: 0, 2.5: 1, 3.5: 2, 4.5: 3, 5.5: 4}

Data split:
Train: 152 images
Val: 38 images
Test: 48 images
Using device: cuda
GPU: NVIDIA GeForce RTX 2060
Starting GhostNet hyperparameter tuning...
Testing 30 hyperparameter combinations

[ 1/30] Testing combination 1
  Optimizer: sgd, LR: 0.0005/0.002
  Batch: 12, Dropout: 0.2, Freeze: 2
