# Hyperparameter Optimization for Promoter CNN

This notebook contains comprehensive hyperparameter optimization for the Promoter CNN model using multiple search strategies:
- Random Search
- Grid Search  
- Bayesian Optimization
- Cross-Validation

The optimized hyperparameters can then be used in the main promoter_cnn.ipynb notebook for final training.


In [2]:
#!/usr/bin/env python3
"""
Required imports for hyperparameter optimization
"""

import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, r2_score
import matplotlib.pyplot as plt
import seaborn as sns
from typing import Tuple, Dict
import warnings
warnings.filterwarnings('ignore')

# Import tuning specific libraries
import random
import time
import itertools
from dataclasses import dataclass, field
from typing import Dict, Any, List, Optional, Tuple
import json
import pickle
from datetime import datetime
from sklearn.model_selection import KFold

print(f"PyTorch version: {torch.__version__}")
print(f"CUDA available: {torch.cuda.is_available()}")
print(f"MPS available: {getattr(torch.backends, 'mps', None) is not None and torch.backends.mps.is_available()}")


PyTorch version: 2.5.1
CUDA available: False
MPS available: True


## Dataset and Model Classes

Import the dataset class and model architecture from the main implementation.


In [3]:
class PromoterDataset(Dataset):
    """Dataset for promoter sequences only"""
    
    def __init__(self, sequences: list, targets: np.ndarray):
        self.sequences = sequences
        self.targets = targets
        
        # DNA encoding dictionary
        self.dna_dict = {'A': 0, 'T': 1, 'G': 2, 'C': 3, 'N': 4}
        
    def __len__(self):
        return len(self.sequences)
    
    def encode_sequence(self, sequence: str, max_length: int = 600) -> np.ndarray:
        """Encode DNA sequence to numerical representation"""
        # Truncate or pad sequence
        if len(sequence) > max_length:
            sequence = sequence[:max_length]
        else:
            sequence = sequence + 'N' * (max_length - len(sequence))
        
        # Convert to numerical encoding
        encoded = np.array([self.dna_dict.get(base.upper(), 4) for base in sequence])
        
        # One-hot encode
        one_hot = np.zeros((max_length, 5))
        one_hot[np.arange(max_length), encoded] = 1
        
        return one_hot.T  # Shape: (5, max_length) for Conv1d
    
    def __getitem__(self, idx):
        sequence = self.encode_sequence(self.sequences[idx])
        target = self.targets[idx].astype(np.float32)
        total = float(np.sum(target))
        if total <= 0:
            target = np.ones_like(target, dtype=np.float32) / target.shape[0]
        else:
            target = target / total
        
        return {
            'sequence': torch.FloatTensor(sequence),
            'target': torch.FloatTensor(target)
        }


In [4]:
import torch.nn as nn

class PromoterCNN(nn.Module):
    """CNN for predicting 5-component probabilities with configurable depth."""
    
    def __init__(self, sequence_length: int = 600, num_blocks: int = 2, base_channels: int = 64, dropout: float = 0.3):
        super(PromoterCNN, self).__init__()
        assert num_blocks >= 1
        
        conv_layers = []
        in_ch = 5
        out_ch = base_channels
        for i in range(num_blocks):
            k = 11 if i == 0 else 7
            p = 5 if i == 0 else 3
            conv_layers.append(nn.Conv1d(in_channels=in_ch, out_channels=out_ch, kernel_size=k, padding=p))
            conv_layers.append(nn.ReLU())
            if i < min(2, num_blocks):
                conv_layers.append(nn.MaxPool1d(kernel_size=4))
            conv_layers.append(nn.Dropout(dropout))
            in_ch = out_ch
            out_ch = min(out_ch * 2, base_channels * (2 ** (num_blocks - 1)))
        conv_layers.append(nn.AdaptiveAvgPool1d(1))
        self.sequence_conv = nn.Sequential(*conv_layers)
        
        final_ch = in_ch
        hidden = max(32, final_ch // 2)
        self.classifier = nn.Sequential(
            nn.Linear(final_ch, hidden),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(hidden, 5)  # 5 components
        )
        
    def forward(self, sequence):
        x = self.sequence_conv(sequence)
        x = x.squeeze(-1)
        logits = self.classifier(x)
        return logits


## Data Loading and Training Functions


In [5]:
def load_and_prepare_data(file_path: str) -> Tuple[list, np.ndarray]:
    """Load and prepare data for training"""
    print("Loading data...")
    df = pd.read_csv(file_path)
    
    # Component probabilities as targets
    prob_cols = ['Component_1_Probability', 'Component_2_Probability', 
                'Component_3_Probability', 'Component_4_Probability', 'Component_5_Probability']
    
    # Filter out rows with missing sequence data or target data
    print(f"Initial data shape: {df.shape}")
    
    # Remove rows with NaN in ProSeq column
    df = df.dropna(subset=['ProSeq'])
    print(f"After removing missing sequences: {df.shape}")
    
    # Remove rows with NaN in any probability column
    df = df.dropna(subset=prob_cols)
    print(f"After removing missing probabilities: {df.shape}")
    
    # Extract features and targets
    sequences = df['ProSeq'].tolist()
    targets = df[prob_cols].values
    
    # Additional validation - ensure all sequences are strings
    valid_sequences = []
    valid_targets = []
    
    for i, seq in enumerate(sequences):
        if isinstance(seq, str) and len(seq) > 0:
            valid_sequences.append(seq)
            valid_targets.append(targets[i])
    
    sequences = valid_sequences
    targets = np.array(valid_targets)
    
    print(f"Final dataset: {len(sequences)} samples")
    print(f"Average sequence length: {np.mean([len(seq) for seq in sequences]):.1f}")
    print(f"Min sequence length: {min([len(seq) for seq in sequences])}")
    print(f"Max sequence length: {max([len(seq) for seq in sequences])}")
    
    return sequences, targets


In [6]:
def train_epoch(model, train_loader, criterion, optimizer, device):
    """Train for one epoch"""
    model.train()
    total_loss = 0.0
    
    for batch in train_loader:
        sequences = batch['sequence'].to(device)
        targets = batch['target'].to(device)
        
        optimizer.zero_grad()
        logits = model(sequences)
        log_probs = F.log_softmax(logits, dim=1)
        loss = criterion(log_probs, targets)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        optimizer.step()
        
        total_loss += loss.item()
    
    return total_loss / len(train_loader)

def validate_epoch(model, val_loader, criterion, device):
    """Validate for one epoch"""
    model.eval()
    total_loss = 0.0
    
    with torch.no_grad():
        for batch in val_loader:
            sequences = batch['sequence'].to(device)
            targets = batch['target'].to(device)
            
            logits = model(sequences)
            log_probs = F.log_softmax(logits, dim=1)
            loss = criterion(log_probs, targets)
            total_loss += loss.item()
    
    return total_loss / len(val_loader)


## Hyperparameter Tuning Framework

The comprehensive hyperparameter optimization framework with multiple search strategies.


In [7]:
@dataclass
class TrialResult:
    config: Dict[str, Any]
    val_loss: float
    val_losses: List[float] = field(default_factory=list)
    train_losses: List[float] = field(default_factory=list)
    params: int = 0
    duration_s: float = 0.0
    epochs_trained: int = 0
    cv_scores: List[float] = field(default_factory=list)
    cv_mean: float = 0.0
    cv_std: float = 0.0
    final_lr: float = 0.0


In [8]:
class HyperparameterTuner:
    """Comprehensive hyperparameter tuning framework with multiple search strategies"""
    
    def __init__(self, train_dataset, val_dataset, device, save_dir="tuning_results"):
        self.train_dataset = train_dataset
        self.val_dataset = val_dataset
        self.device = device
        self.save_dir = save_dir
        self.results = []
        
        # Extensive search space
        self.SEARCH_SPACE = {
            # Architecture parameters
            'depth': [1, 2, 3, 4],
            'base_channels': [8, 16, 24, 32, 48, 64],
            'dropout': [0.1, 0.2, 0.3, 0.4, 0.5],
            
            # Optimizer parameters
            'optimizer': ['adam', 'adamw', 'sgd', 'rmsprop'],
            'lr': [1e-4, 3e-4, 1e-3, 3e-3, 1e-2],
            'weight_decay': [0, 1e-6, 1e-5, 1e-4, 3e-4, 1e-3],
            
            # Training parameters
            'batch_size': [16, 32, 64, 128],
            'scheduler': ['plateau', 'cosine', 'step', 'none'],
            
            # SGD specific
            'momentum': [0.9, 0.95, 0.99],
            'nesterov': [True, False],
            
            # Scheduler specific
            'scheduler_patience': [3, 5, 8, 10],
            'scheduler_factor': [0.1, 0.3, 0.5, 0.7],
            'step_size': [10, 20, 30],
            'gamma': [0.1, 0.3, 0.5],
            
            # Loss function
            'loss_function': ['kldiv', 'mse', 'smooth_l1', 'cross_entropy']
        }
    
    def build_loaders(self, batch_size: int):
        """Build data loaders with specified batch size"""
        train_loader = DataLoader(self.train_dataset, batch_size=batch_size, shuffle=True, num_workers=0)
        val_loader = DataLoader(self.val_dataset, batch_size=batch_size, shuffle=False, num_workers=0)
        return train_loader, val_loader
    
    def create_optimizer(self, model, config):
        """Create optimizer based on configuration"""
        params = model.parameters()
        lr = config['lr']
        wd = config['weight_decay']
        
        if config['optimizer'] == 'adam':
            return optim.Adam(params, lr=lr, weight_decay=wd)
        elif config['optimizer'] == 'adamw':
            return optim.AdamW(params, lr=lr, weight_decay=wd)
        elif config['optimizer'] == 'sgd':
            momentum = config.get('momentum', 0.9)
            nesterov = config.get('nesterov', True)
            return optim.SGD(params, lr=lr, weight_decay=wd, momentum=momentum, nesterov=nesterov)
        elif config['optimizer'] == 'rmsprop':
            return optim.RMSprop(params, lr=lr, weight_decay=wd)
        else:
            return optim.Adam(params, lr=lr, weight_decay=wd)
    
    def create_scheduler(self, optimizer, config):
        """Create learning rate scheduler based on configuration"""
        scheduler_type = config.get('scheduler', 'plateau')
        
        if scheduler_type == 'plateau':
            patience = config.get('scheduler_patience', 5)
            factor = config.get('scheduler_factor', 0.5)
            return optim.lr_scheduler.ReduceLROnPlateau(optimizer, patience=patience, factor=factor)
        elif scheduler_type == 'cosine':
            return optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=50)
        elif scheduler_type == 'step':
            step_size = config.get('step_size', 20)
            gamma = config.get('gamma', 0.5)
            return optim.lr_scheduler.StepLR(optimizer, step_size=step_size, gamma=gamma)
        else:
            return None
    
    def create_criterion(self, config):
        """Create loss function based on configuration"""
        loss_type = config.get('loss_function', 'kldiv')
        
        if loss_type == 'kldiv':
            return nn.KLDivLoss(reduction='batchmean')
        elif loss_type == 'mse':
            return nn.MSELoss()
        elif loss_type == 'smooth_l1':
            return nn.SmoothL1Loss()
        elif loss_type == 'cross_entropy':
            return nn.CrossEntropyLoss()
        else:
            return nn.KLDivLoss(reduction='batchmean')
    
    def run_trial(self, config: Dict[str, Any], max_epochs: int = 25, es_patience: int = 8, verbose: bool = False) -> TrialResult:
        """Run a single hyperparameter trial"""
        try:
            # Build data loaders
            train_loader, val_loader = self.build_loaders(config['batch_size'])
            
            # Create model
            model = PromoterCNN(
                num_blocks=config['depth'],
                base_channels=config['base_channels'],
                dropout=config['dropout']
            ).to(self.device)
            
            param_count = sum(p.numel() for p in model.parameters() if p.requires_grad)
            
            # Create optimizer, scheduler, and criterion
            optimizer = self.create_optimizer(model, config)
            scheduler = self.create_scheduler(optimizer, config)
            criterion = self.create_criterion(config)
            
            # Training loop
            best_val_loss = float('inf')
            bad_epochs = 0
            train_losses = []
            val_losses = []
            start_time = time.time()
            
            for epoch in range(max_epochs):
                # Train
                train_loss = train_epoch(model, train_loader, criterion, optimizer, self.device)
                
                # Validate
                val_loss = validate_epoch(model, val_loader, criterion, self.device)
                
                # Update scheduler
                if scheduler is not None:
                    if isinstance(scheduler, optim.lr_scheduler.ReduceLROnPlateau):
                        scheduler.step(val_loss)
                    else:
                        scheduler.step()
                
                train_losses.append(train_loss)
                val_losses.append(val_loss)
                
                # Early stopping
                if val_loss < best_val_loss - 1e-6:
                    best_val_loss = val_loss
                    bad_epochs = 0
                else:
                    bad_epochs += 1
                    if bad_epochs >= es_patience:
                        if verbose:
                            print(f"Early stopping at epoch {epoch+1}")
                        break
                
                if verbose and epoch % 5 == 0:
                    current_lr = optimizer.param_groups[0]['lr']
                    print(f"Epoch {epoch+1}: train={train_loss:.6f}, val={val_loss:.6f}, lr={current_lr:.2e}")
            
            duration = time.time() - start_time
            final_lr = optimizer.param_groups[0]['lr']
            
            return TrialResult(
                config=config,
                val_loss=best_val_loss,
                val_losses=val_losses,
                train_losses=train_losses,
                params=param_count,
                duration_s=duration,
                epochs_trained=len(train_losses),
                final_lr=final_lr
            )
            
        except Exception as e:
            print(f"Trial failed with config {config}: {e}")
            return TrialResult(
                config=config,
                val_loss=float('inf'),
                params=0,
                duration_s=0.0
            )
    
    def random_search(self, num_trials: int = 50, seed: int = 42) -> List[TrialResult]:
        """Perform random search over hyperparameter space"""
        print(f"🔍 Starting Random Search with {num_trials} trials...")
        random.seed(seed)
        results = []
        
        for trial in range(1, num_trials + 1):
            # Sample configuration
            config = {key: random.choice(values) for key, values in self.SEARCH_SPACE.items()}
            
            # Clean up SGD-specific parameters if not using SGD
            if config['optimizer'] != 'sgd':
                config.pop('momentum', None)
                config.pop('nesterov', None)
            
            # Clean up scheduler-specific parameters
            scheduler_type = config.get('scheduler', 'plateau')
            if scheduler_type != 'plateau':
                config.pop('scheduler_patience', None)
                config.pop('scheduler_factor', None)
            if scheduler_type != 'step':
                config.pop('step_size', None)
                config.pop('gamma', None)
            
            # Run trial
            result = self.run_trial(config)
            results.append(result)
            
            print(f"Trial {trial:3d}/{num_trials}: val_loss={result.val_loss:.6f}, "
                  f"params={result.params:,}, time={result.duration_s:.1f}s")
            
            # Print best config so far
            if trial % 10 == 0:
                best_so_far = min(results, key=lambda r: r.val_loss)
                print(f"   Best so far: {best_so_far.val_loss:.6f} with {best_so_far.params:,} params")
        
        return results
    
    def save_results(self, results: List[TrialResult], filename: str = None):
        """Save tuning results to file"""
        if filename is None:
            timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
            filename = f"tuning_results_{timestamp}.pkl"
        
        with open(filename, 'wb') as f:
            pickle.dump(results, f)
        print(f"💾 Results saved to {filename}")
    
    def analyze_results(self, results: List[TrialResult], top_k: int = 5):
        """Analyze tuning results"""
        if not results:
            print("No results to analyze!")
            return
        
        # Sort by validation loss
        results_sorted = sorted(results, key=lambda r: r.val_loss if r.val_loss != float('inf') else 999)
        valid_results = [r for r in results_sorted if r.val_loss != float('inf')]
        
        print(f"📊 HYPERPARAMETER TUNING ANALYSIS")
        print(f"{'='*60}")
        print(f"Total trials: {len(results)}")
        print(f"Successful trials: {len(valid_results)}")
        print(f"Failed trials: {len(results) - len(valid_results)}")
        
        if not valid_results:
            print("No successful trials to analyze!")
            return
        
        # Top configurations
        print(f"\n🏆 TOP {min(top_k, len(valid_results))} CONFIGURATIONS:")
        print("-" * 60)
        for i, result in enumerate(valid_results[:top_k]):
            print(f"\nRank {i+1}:")
            print(f"  Validation Loss: {result.val_loss:.6f}")
            print(f"  Parameters: {result.params:,}")
            print(f"  Training Time: {result.duration_s:.1f}s")
            print(f"  Epochs: {result.epochs_trained}")
            print(f"  Config: {result.config}")
        
        return valid_results[:top_k]


## Load Data and Setup


In [9]:
# Load data
sequences, targets = load_and_prepare_data('../../data/processed/ProSeq_with_5component_analysis.csv')

# Show data statistics
print(f"\nData statistics:")
print(f"Total samples: {len(sequences)}")
print(f"Target shape: {targets.shape}")
for i in range(5):
    print(f"  Component {i+1}: {targets[:, i].min():.4f} - {targets[:, i].max():.4f} (mean: {targets[:, i].mean():.4f})")

# Split data (stratified by dominant component)
labels = np.argmax(targets, axis=1)
train_seq, test_seq, train_targets, test_targets = train_test_split(
    sequences, targets, test_size=0.2, random_state=42, stratify=labels
)

train_labels = np.argmax(train_targets, axis=1)
train_seq, val_seq, train_targets, val_targets = train_test_split(
    train_seq, train_targets, test_size=0.2, random_state=42, stratify=train_labels
)

print(f"\nData splits:")
print(f"  Train: {len(train_seq)}, Val: {len(val_seq)}, Test: {len(test_seq)}")

# Create datasets
train_dataset = PromoterDataset(train_seq, train_targets)
val_dataset = PromoterDataset(val_seq, val_targets)

# Device selection
if getattr(torch.backends, 'mps', None) is not None and torch.backends.mps.is_available():
    device = torch.device('mps')
else:
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")


Loading data...
Initial data shape: (8307, 17)
After removing missing sequences: (8304, 17)
After removing missing probabilities: (8304, 17)
Final dataset: 8304 samples
Average sequence length: 600.0
Min sequence length: 472
Max sequence length: 600

Data statistics:
Total samples: 8304
Target shape: (8304, 5)
  Component 1: 0.0000 - 1.0000 (mean: 0.0878)
  Component 2: 0.0000 - 0.9414 (mean: 0.3198)
  Component 3: 0.0000 - 0.9999 (mean: 0.1145)
  Component 4: 0.0000 - 0.9296 (mean: 0.2135)
  Component 5: 0.0000 - 0.9702 (mean: 0.2644)

Data splits:
  Train: 5314, Val: 1329, Test: 1661
Using device: mps


## Run Hyperparameter Optimization

Execute the comprehensive hyperparameter search using multiple strategies.


In [10]:
# Initialize the tuner
tuner = HyperparameterTuner(train_dataset, val_dataset, device)

# Configuration for tuning runs
RANDOM_SEARCH_TRIALS = 20
RANDOM_SEED = 42

print(f"🎯 STARTING HYPERPARAMETER OPTIMIZATION")
print(f"Random Search Trials: {RANDOM_SEARCH_TRIALS}")
print(f"Random Seed: {RANDOM_SEED}")

# Random search to explore the space
print(f"\n🎲 PHASE 1: RANDOM SEARCH")
print("="*50)
random_results = tuner.random_search(num_trials=RANDOM_SEARCH_TRIALS, seed=RANDOM_SEED)
gradient_results = tuner.gradient_search(num_trials=RANDOM_SEARCH_TRIALS, seed=RANDOM_SEED)

# Analyze results
print(f"\n📊 ANALYSIS")
print("="*50)
best_configs = tuner.analyze_results(random_results + gradient_results, top_k=3)

# Save results
tuner.save_results(random_results, "hyperparameter_tuning_results.pkl")

# Extract the best configuration
if best_configs:
    best_config = best_configs[0]
    print(f"\n🎯 BEST CONFIGURATION:")
    print(f"   Validation Loss: {best_config.val_loss:.6f}")
    print(f"   Configuration: {best_config.config}")
    
    # Save best config as JSON for easy loading in main notebook
    import json
    with open('best_hyperparameters.json', 'w') as f:
        json.dump(best_config.config, f, indent=2)
    print(f"\n💾 Best configuration saved to 'best_hyperparameters.json'")
else:
    print("No successful configurations found!")


🎯 STARTING HYPERPARAMETER OPTIMIZATION
Random Search Trials: 20
Random Seed: 42

🎲 PHASE 1: RANDOM SEARCH
🔍 Starting Random Search with 20 trials...
Trial   1/20: val_loss=3.384151, params=901, time=36.9s
Trial   2/20: val_loss=1.309438, params=4,421, time=20.1s
Trial   3/20: val_loss=1.309438, params=901, time=15.3s
Trial   4/20: val_loss=1.309438, params=183,269, time=33.1s
Trial   5/20: val_loss=3.384134, params=11,189, time=27.2s
Trial   6/20: val_loss=1.505727, params=6,197, time=32.3s
Trial   7/20: val_loss=1.507379, params=11,189, time=13.1s
Trial   8/20: val_loss=1.113995, params=901, time=19.6s
Trial   9/20: val_loss=3.384212, params=324,229, time=44.6s
Trial  10/20: val_loss=1.111563, params=11,189, time=18.0s
   Best so far: 1.111563 with 11,189 params
Trial  11/20: val_loss=3.384222, params=5,829, time=17.8s
Trial  12/20: val_loss=3.384753, params=82,245, time=33.1s
Trial  13/20: val_loss=3.384081, params=2,069, time=80.1s
Trial  14/20: val_loss=1.110731, params=84,965, tim

AttributeError: 'HyperparameterTuner' object has no attribute 'gradient_search'