In [None]:
import numpy as np
from tensorflow import keras
from tensorflow.keras.datasets import mnist
import matplotlib.pyplot as plt
import time
import json

# 載入並預處理MNIST數據
def load_data():
    (x_train, y_train), (x_test, y_test) = mnist.load_data()
    x_train = x_train.reshape(-1, 784).astype('float32') / 255.0
    x_test = x_test.reshape(-1, 784).astype('float32') / 255.0
    y_train = keras.utils.to_categorical(y_train, 10)
    y_test = keras.utils.to_categorical(y_test, 10)
    return x_train, y_train, x_test, y_test

# MLP類別（支援SGD和ES）
class MLP:
    def __init__(self, layers):
        self.layers = layers
        self.weights = []
        self.biases = []
        for i in range(len(layers) - 1):
            w = np.random.randn(layers[i], layers[i+1]) * 0.1
            b = np.zeros((1, layers[i+1]))
            self.weights.append(w)
            self.biases.append(b)
    
    def relu(self, x):
        return np.maximum(0, x)
    
    def softmax(self, x):
        exp_x = np.exp(x - np.max(x, axis=1, keepdims=True))
        return exp_x / np.sum(exp_x, axis=1, keepdims=True)
    
    def forward(self, x):
        a = x
        for i in range(len(self.weights) - 1):
            z = np.dot(a, self.weights[i]) + self.biases[i]
            a = self.relu(z)
        z = np.dot(a, self.weights[-1]) + self.biases[-1]
        a = self.softmax(z)
        return a
    
    def get_params(self):
        params = []
        for w, b in zip(self.weights, self.biases):
            params.append(w.flatten())
            params.append(b.flatten())
        return np.concatenate(params)
    
    def set_params(self, params):
        idx = 0
        for i in range(len(self.weights)):
            w_shape = self.weights[i].shape
            b_shape = self.biases[i].shape
            w_size = np.prod(w_shape)
            b_size = np.prod(b_shape)
            self.weights[i] = params[idx:idx+w_size].reshape(w_shape)
            idx += w_size
            self.biases[i] = params[idx:idx+b_size].reshape(b_shape)
            idx += b_size
    
    def evaluate(self, x, y):
        pred = self.forward(x)
        acc = np.mean(np.argmax(pred, axis=1) == np.argmax(y, axis=1))
        return acc
    
    def compute_loss(self, x, y):
        pred = self.forward(x)
        pred = np.clip(pred, 1e-10, 1 - 1e-10)
        loss = -np.mean(np.sum(y * np.log(pred), axis=1))
        return loss
    
    def backward(self, x, y):
        """反向傳播計算梯度"""
        m = x.shape[0]
        activations = [x]
        z_values = []
        
        # 前向傳播並保存中間值
        a = x
        for i in range(len(self.weights) - 1):
            z = np.dot(a, self.weights[i]) + self.biases[i]
            z_values.append(z)
            a = self.relu(z)
            activations.append(a)
        
        z = np.dot(a, self.weights[-1]) + self.biases[-1]
        z_values.append(z)
        output = self.softmax(z)
        
        # 反向傳播
        dz = output - y
        gradients_w = []
        gradients_b = []
        
        for i in range(len(self.weights) - 1, -1, -1):
            dw = np.dot(activations[i].T, dz) / m
            db = np.sum(dz, axis=0, keepdims=True) / m
            gradients_w.insert(0, dw)
            gradients_b.insert(0, db)
            
            if i > 0:
                dz = np.dot(dz, self.weights[i].T)
                dz[z_values[i-1] <= 0] = 0  # ReLU導數
        
        return gradients_w, gradients_b
    
    def sgd_update(self, x, y, learning_rate=0.01):
        """SGD更新（使用反向傳播）"""
        grad_w, grad_b = self.backward(x, y)
        for i in range(len(self.weights)):
            self.weights[i] -= learning_rate * grad_w[i]
            self.biases[i] -= learning_rate * grad_b[i]

# 純ES優化器
class PureES:
    def __init__(self, model, population_size=50, sigma=0.1, learning_rate=0.03):
        self.model = model
        self.pop_size = population_size
        self.sigma = sigma
        self.lr = learning_rate
        self.param_size = len(model.get_params())
    
    def train(self, x_train, y_train, x_test, y_test, generations=200, sample_size=1000):
        history = {
            'train_acc': [], 'test_acc': [], 'train_loss': [],
            'time': [], 'samples_seen': []
        }
        best_params = self.model.get_params()
        total_samples = 0
        start_time = time.time()
        
        for gen in range(generations):
            idx = np.random.choice(len(x_train), sample_size, replace=False)
            x_sample = x_train[idx]
            y_sample = y_train[idx]
            
            noise = np.random.randn(self.pop_size, self.param_size)
            rewards = np.zeros(self.pop_size)
            
            for i in range(self.pop_size):
                params_try = best_params + self.sigma * noise[i]
                self.model.set_params(params_try)
                rewards[i] = self.model.evaluate(x_sample, y_sample)
            
            total_samples += self.pop_size * sample_size
            
            rewards = (rewards - np.mean(rewards)) / (np.std(rewards) + 1e-8)
            best_params += self.lr / (self.pop_size * self.sigma) * np.dot(noise.T, rewards)
            self.model.set_params(best_params)
            
            train_acc = self.model.evaluate(x_sample, y_sample)
            test_acc = self.model.evaluate(x_test, y_test)
            train_loss = self.model.compute_loss(x_sample, y_sample)
            
            history['train_acc'].append(train_acc)
            history['test_acc'].append(test_acc)
            history['train_loss'].append(train_loss)
            history['time'].append(time.time() - start_time)
            history['samples_seen'].append(total_samples)
            
            if (gen + 1) % 20 == 0:
                print(f"Gen {gen+1}/{generations} - Train: {train_acc:.4f}, "
                      f"Test: {test_acc:.4f}, Time: {history['time'][-1]:.1f}s")
        
        return history

# 純SGD優化器
class PureSGD:
    def __init__(self, model, learning_rate=0.01, batch_size=128):
        self.model = model
        self.lr = learning_rate
        self.batch_size = batch_size
    
    def train(self, x_train, y_train, x_test, y_test, epochs=200):
        history = {
            'train_acc': [], 'test_acc': [], 'train_loss': [],
            'time': [], 'samples_seen': []
        }
        total_samples = 0
        start_time = time.time()
        
        for epoch in range(epochs):
            # 打亂數據
            indices = np.random.permutation(len(x_train))
            x_shuffled = x_train[indices]
            y_shuffled = y_train[indices]
            
            num_batches = len(x_train) // self.batch_size
            
            for batch_idx in range(num_batches):
                start_idx = batch_idx * self.batch_size
                end_idx = start_idx + self.batch_size
                x_batch = x_shuffled[start_idx:end_idx]
                y_batch = y_shuffled[start_idx:end_idx]
                
                self.model.sgd_update(x_batch, y_batch, self.lr)
                total_samples += self.batch_size
            
            # 評估（使用小批次以加速）
            eval_size = 1000
            eval_idx = np.random.choice(len(x_train), eval_size, replace=False)
            train_acc = self.model.evaluate(x_train[eval_idx], y_train[eval_idx])
            test_acc = self.model.evaluate(x_test, y_test)
            train_loss = self.model.compute_loss(x_train[eval_idx], y_train[eval_idx])
            
            history['train_acc'].append(train_acc)
            history['test_acc'].append(test_acc)
            history['train_loss'].append(train_loss)
            history['time'].append(time.time() - start_time)
            history['samples_seen'].append(total_samples)
            
            if (epoch + 1) % 20 == 0:
                print(f"Epoch {epoch+1}/{epochs} - Train: {train_acc:.4f}, "
                      f"Test: {test_acc:.4f}, Time: {history['time'][-1]:.1f}s")
        
        return history

# 混合ES+SGD優化器
class HybridESGD:
    def __init__(self, model, pop_size=50, es_sigma=0.1, es_lr=0.03, 
                 sgd_lr=0.001, sgd_batch_size=128):
        self.model = model
        self.pop_size = pop_size
        self.sigma = es_sigma
        self.es_lr = es_lr
        self.sgd_lr = sgd_lr
        self.batch_size = sgd_batch_size
        self.param_size = len(model.get_params())
    
    def train(self, x_train, y_train, x_test, y_test, generations=100, 
              es_sample_size=1000):
        history = {
            'train_acc': [], 'test_acc': [], 'train_loss': [],
            'best_individual_acc': [], 'time': [], 'samples_seen': []
        }
        best_params = self.model.get_params()
        total_samples = 0
        start_time = time.time()
        
        for gen in range(generations):
            # ES階段
            idx = np.random.choice(len(x_train), es_sample_size, replace=False)
            x_sample = x_train[idx]
            y_sample = y_train[idx]
            
            noise = np.random.randn(self.pop_size, self.param_size)
            rewards = np.zeros(self.pop_size)
            all_params = []
            
            for i in range(self.pop_size):
                params_try = best_params + self.sigma * noise[i]
                all_params.append(params_try)
                self.model.set_params(params_try)
                rewards[i] = self.model.evaluate(x_sample, y_sample)
            
            total_samples += self.pop_size * es_sample_size
            
            best_idx = np.argmax(rewards)
            best_individual_params = all_params[best_idx]
            best_individual_acc = rewards[best_idx]
            history['best_individual_acc'].append(best_individual_acc)
            
            # SGD階段（1 epoch）
            self.model.set_params(best_individual_params)
            num_batches = len(x_train) // self.batch_size
            
            for batch_idx in range(num_batches):
                start_idx = batch_idx * self.batch_size
                end_idx = start_idx + self.batch_size
                x_batch = x_train[start_idx:end_idx]
                y_batch = y_train[start_idx:end_idx]
                self.model.sgd_update(x_batch, y_batch, self.sgd_lr)
                total_samples += self.batch_size
            
            best_params = self.model.get_params()
            
            # ES更新
            rewards = (rewards - np.mean(rewards)) / (np.std(rewards) + 1e-8)
            es_update = self.es_lr / (self.pop_size * self.sigma) * np.dot(noise.T, rewards)
            best_params += es_update
            self.model.set_params(best_params)
            
            # 評估
            eval_idx = np.random.choice(len(x_train), 1000, replace=False)
            train_acc = self.model.evaluate(x_train[eval_idx], y_train[eval_idx])
            test_acc = self.model.evaluate(x_test, y_test)
            train_loss = self.model.compute_loss(x_train[eval_idx], y_train[eval_idx])
            
            history['train_acc'].append(train_acc)
            history['test_acc'].append(test_acc)
            history['train_loss'].append(train_loss)
            history['time'].append(time.time() - start_time)
            history['samples_seen'].append(total_samples)
            
            if (gen + 1) % 10 == 0:
                print(f"Gen {gen+1}/{generations} - Train: {train_acc:.4f}, "
                      f"Test: {test_acc:.4f}, Time: {history['time'][-1]:.1f}s")
        
        return history

# 主實驗
if __name__ == "__main__":
    print("=" * 60)
    print("Evolution Strategy vs SGD Baseline Comparison")
    print("=" * 60)
    
    x_train, y_train, x_test, y_test = load_data()
    
    results = {}
    
    # 實驗1: 純ES
    print("\n[1/3] 訓練純ES模型...")
    model_es = MLP([784, 128, 64, 10])
    es_optimizer = PureES(model_es, population_size=50, sigma=0.1, learning_rate=0.03)
    results['ES'] = es_optimizer.train(x_train, y_train, x_test, y_test, 
                                        generations=200, sample_size=1000)
    
    # 實驗2: 純SGD
    print("\n[2/3] 訓練純SGD模型...")
    model_sgd = MLP([784, 128, 64, 10])
    sgd_optimizer = PureSGD(model_sgd, learning_rate=0.01, batch_size=128)
    results['SGD'] = sgd_optimizer.train(x_train, y_train, x_test, y_test, epochs=200)
    
    # 實驗3: 混合ES+SGD
    print("\n[3/3] 訓練混合ES+SGD模型...")
    model_hybrid = MLP([784, 128, 64, 10])
    hybrid_optimizer = HybridESGD(model_hybrid, pop_size=50, es_sigma=0.1, 
                                   es_lr=0.03, sgd_lr=0.001, sgd_batch_size=128)
    results['Hybrid'] = hybrid_optimizer.train(x_train, y_train, x_test, y_test, 
                                                generations=100)
    
    # 繪製對比圖
    fig, axes = plt.subplots(2, 3, figsize=(18, 10))
    
    # 測試準確率 vs 迭代次數
    ax = axes[0, 0]
    for name, history in results.items():
        iterations = range(1, len(history['test_acc']) + 1)
        ax.plot(iterations, history['test_acc'], label=name, linewidth=2)
    ax.set_xlabel('Iterations (Generations/Epochs)')
    ax.set_ylabel('Test Accuracy')
    ax.set_title('Test Accuracy vs Iterations')
    ax.legend()
    ax.grid(True, alpha=0.3)
    
    # 測試準確率 vs 時間
    ax = axes[0, 1]
    for name, history in results.items():
        ax.plot(history['time'], history['test_acc'], label=name, linewidth=2)
    ax.set_xlabel('Time (seconds)')
    ax.set_ylabel('Test Accuracy')
    ax.set_title('Test Accuracy vs Time (Efficiency)')
    ax.legend()
    ax.grid(True, alpha=0.3)
    
    # 測試準確率 vs 樣本數
    ax = axes[0, 2]
    for name, history in results.items():
        samples_millions = np.array(history['samples_seen']) / 1e6
        ax.plot(samples_millions, history['test_acc'], label=name, linewidth=2)
    ax.set_xlabel('Samples Seen (Millions)')
    ax.set_ylabel('Test Accuracy')
    ax.set_title('Test Accuracy vs Samples (Sample Efficiency)')
    ax.legend()
    ax.grid(True, alpha=0.3)
    
    # 訓練損失
    ax = axes[1, 0]
    for name, history in results.items():
        iterations = range(1, len(history['train_loss']) + 1)
        ax.plot(iterations, history['train_loss'], label=name, linewidth=2)
    ax.set_xlabel('Iterations')
    ax.set_ylabel('Training Loss')
    ax.set_title('Training Loss vs Iterations')
    ax.legend()
    ax.grid(True, alpha=0.3)
    ax.set_yscale('log')
    
    # 收斂速度比較（達到特定準確率的時間）
    ax = axes[1, 1]
    target_accs = [0.80, 0.85, 0.90, 0.92, 0.94]
    x_pos = np.arange(len(target_accs))
    width = 0.25
    
    for idx, (name, history) in enumerate(results.items()):
        times_to_target = []
        for target in target_accs:
            test_accs = np.array(history['test_acc'])
            times = np.array(history['time'])
            indices = np.where(test_accs >= target)[0]
            if len(indices) > 0:
                times_to_target.append(times[indices[0]])
            else:
                times_to_target.append(times[-1])
        ax.bar(x_pos + idx * width, times_to_target, width, label=name)
    
    ax.set_xlabel('Target Accuracy')
    ax.set_ylabel('Time to Reach (seconds)')
    ax.set_title('Convergence Speed Comparison')
    ax.set_xticks(x_pos + width)
    ax.set_xticklabels([f'{acc:.0%}' for acc in target_accs])
    ax.legend()
    ax.grid(True, alpha=0.3, axis='y')
    
    # 最終性能比較
    ax = axes[1, 2]
    methods = list(results.keys())
    final_test_acc = [results[m]['test_acc'][-1] for m in methods]
    final_time = [results[m]['time'][-1] for m in methods]
    final_samples = [results[m]['samples_seen'][-1] / 1e6 for m in methods]
    
    x_pos = np.arange(len(methods))
    width = 0.25
    ax.bar(x_pos - width, final_test_acc, width, label='Test Accuracy', alpha=0.8)
    ax.bar(x_pos, [t/max(final_time) for t in final_time], width, 
           label='Rel. Time', alpha=0.8)
    ax.bar(x_pos + width, [s/max(final_samples) for s in final_samples], width,
           label='Rel. Samples', alpha=0.8)
    
    ax.set_ylabel('Value (normalized)')
    ax.set_title('Final Performance Metrics')
    ax.set_xticks(x_pos)
    ax.set_xticklabels(methods)
    ax.legend()
    ax.grid(True, alpha=0.3, axis='y')
    
    plt.tight_layout()
    plt.savefig('es_sgd_comparison.png', dpi=150, bbox_inches='tight')
    plt.show()
    
    # 打印統計摘要
    print("\n" + "=" * 60)
    print("FINAL RESULTS SUMMARY")
    print("=" * 60)
    for name, history in results.items():
        print(f"\n{name}:")
        print(f"  最終測試準確率: {history['test_acc'][-1]:.4f}")
        print(f"  最佳測試準確率: {max(history['test_acc']):.4f}")
        print(f"  總訓練時間: {history['time'][-1]:.1f}秒")
        print(f"  總樣本數: {history['samples_seen'][-1]/1e6:.2f}M")
        print(f"  樣本效率 (acc/M samples): {history['test_acc'][-1]/(history['samples_seen'][-1]/1e6):.4f}")
        print(f"  時間效率 (acc/min): {history['test_acc'][-1]/(history['time'][-1]/60):.4f}")
    
    # 保存結果到JSON
    summary = {
        name: {
            'final_test_acc': float(history['test_acc'][-1]),
            'best_test_acc': float(max(history['test_acc'])),
            'total_time': float(history['time'][-1]),
            'total_samples': int(history['samples_seen'][-1]),
            'iterations': len(history['test_acc'])
        }
        for name, history in results.items()
    }
    
    with open('comparison_results.json', 'w') as f:
        json.dump(summary, f, indent=2)
    
    print("\n結果已保存至: es_sgd_comparison.png 和 comparison_results.json")

In [None]:
import numpy as np
from tensorflow import keras
from tensorflow.keras.datasets import mnist
import matplotlib.pyplot as plt
import time
import json

# 載入並預處理MNIST數據
def load_data():
    (x_train, y_train), (x_test, y_test) = mnist.load_data()
    x_train = x_train.reshape(-1, 784).astype('float32') / 255.0
    x_test = x_test.reshape(-1, 784).astype('float32') / 255.0
    y_train = keras.utils.to_categorical(y_train, 10)
    y_test = keras.utils.to_categorical(y_test, 10)
    return x_train, y_train, x_test, y_test

# Learning Rate Schedulers
class LinearWarmupCosineAnnealing:
    """Linear warmup followed by cosine annealing"""
    def __init__(self, max_lr, min_lr, warmup_steps, total_steps):
        self.max_lr = max_lr
        self.min_lr = min_lr
        self.warmup_steps = warmup_steps
        self.total_steps = total_steps
    
    def get_lr(self, step):
        if step < self.warmup_steps:
            # Linear warmup
            return self.min_lr + (self.max_lr - self.min_lr) * step / self.warmup_steps
        else:
            # Cosine annealing
            progress = (step - self.warmup_steps) / (self.total_steps - self.warmup_steps)
            return self.min_lr + (self.max_lr - self.min_lr) * 0.5 * (1 + np.cos(np.pi * progress))

class AdaptiveSigmaScheduler:
    """Adaptive sigma for ES based on improvement rate"""
    def __init__(self, initial_sigma, min_sigma, max_sigma, decay_factor=0.99):
        self.sigma = initial_sigma
        self.min_sigma = min_sigma
        self.max_sigma = max_sigma
        self.decay_factor = decay_factor
        self.prev_best_reward = None
    
    def update(self, best_reward, avg_reward):
        """Update sigma based on progress"""
        if self.prev_best_reward is not None:
            improvement = best_reward - self.prev_best_reward
            
            if improvement > 0.01:  # Good progress, can decay sigma
                self.sigma *= self.decay_factor
            elif improvement < 0.001:  # Stagnation, increase exploration
                self.sigma *= 1.02
        
        self.prev_best_reward = best_reward
        self.sigma = np.clip(self.sigma, self.min_sigma, self.max_sigma)
        return self.sigma

# MLP類別
class MLP:
    def __init__(self, layers):
        self.layers = layers
        self.weights = []
        self.biases = []
        for i in range(len(layers) - 1):
            w = np.random.randn(layers[i], layers[i+1]) * np.sqrt(2.0 / layers[i])
            b = np.zeros((1, layers[i+1]))
            self.weights.append(w)
            self.biases.append(b)
    
    def relu(self, x):
        return np.maximum(0, x)
    
    def softmax(self, x):
        exp_x = np.exp(x - np.max(x, axis=1, keepdims=True))
        return exp_x / np.sum(exp_x, axis=1, keepdims=True)
    
    def forward(self, x):
        a = x
        for i in range(len(self.weights) - 1):
            z = np.dot(a, self.weights[i]) + self.biases[i]
            a = self.relu(z)
        z = np.dot(a, self.weights[-1]) + self.biases[-1]
        a = self.softmax(z)
        return a
    
    def get_params(self):
        params = []
        for w, b in zip(self.weights, self.biases):
            params.append(w.flatten())
            params.append(b.flatten())
        return np.concatenate(params)
    
    def set_params(self, params):
        idx = 0
        for i in range(len(self.weights)):
            w_shape = self.weights[i].shape
            b_shape = self.biases[i].shape
            w_size = np.prod(w_shape)
            b_size = np.prod(b_shape)
            self.weights[i] = params[idx:idx+w_size].reshape(w_shape)
            idx += w_size
            self.biases[i] = params[idx:idx+b_size].reshape(b_shape)
            idx += b_size
    
    def evaluate(self, x, y):
        pred = self.forward(x)
        acc = np.mean(np.argmax(pred, axis=1) == np.argmax(y, axis=1))
        return acc
    
    def compute_loss(self, x, y):
        pred = self.forward(x)
        pred = np.clip(pred, 1e-10, 1 - 1e-10)
        loss = -np.mean(np.sum(y * np.log(pred), axis=1))
        return loss
    
    def backward(self, x, y):
        m = x.shape[0]
        activations = [x]
        z_values = []
        
        a = x
        for i in range(len(self.weights) - 1):
            z = np.dot(a, self.weights[i]) + self.biases[i]
            z_values.append(z)
            a = self.relu(z)
            activations.append(a)
        
        z = np.dot(a, self.weights[-1]) + self.biases[-1]
        z_values.append(z)
        output = self.softmax(z)
        
        dz = output - y
        gradients_w = []
        gradients_b = []
        
        for i in range(len(self.weights) - 1, -1, -1):
            dw = np.dot(activations[i].T, dz) / m
            db = np.sum(dz, axis=0, keepdims=True) / m
            gradients_w.insert(0, dw)
            gradients_b.insert(0, db)
            
            if i > 0:
                dz = np.dot(dz, self.weights[i].T)
                dz[z_values[i-1] <= 0] = 0
        
        return gradients_w, gradients_b
    
    def sgd_update(self, x, y, learning_rate=0.01):
        grad_w, grad_b = self.backward(x, y)
        for i in range(len(self.weights)):
            self.weights[i] -= learning_rate * grad_w[i]
            self.biases[i] -= learning_rate * grad_b[i]

# 純ES優化器（帶adaptive sigma）
class PureESAdaptive:
    def __init__(self, model, population_size=50, initial_sigma=0.1, learning_rate=0.03):
        self.model = model
        self.pop_size = population_size
        self.lr = learning_rate
        self.param_size = len(model.get_params())
        self.sigma_scheduler = AdaptiveSigmaScheduler(
            initial_sigma=initial_sigma,
            min_sigma=0.001,
            max_sigma=0.3,
            decay_factor=0.995
        )
    
    def train(self, x_train, y_train, x_test, y_test, generations=200, sample_size=1000):
        history = {
            'train_acc': [], 'test_acc': [], 'train_loss': [],
            'time': [], 'samples_seen': [], 'sigma': []
        }
        best_params = self.model.get_params()
        total_samples = 0
        start_time = time.time()
        
        for gen in range(generations):
            idx = np.random.choice(len(x_train), sample_size, replace=False)
            x_sample = x_train[idx]
            y_sample = y_train[idx]
            
            sigma = self.sigma_scheduler.sigma
            noise = np.random.randn(self.pop_size, self.param_size)
            rewards = np.zeros(self.pop_size)
            
            for i in range(self.pop_size):
                params_try = best_params + sigma * noise[i]
                self.model.set_params(params_try)
                rewards[i] = self.model.evaluate(x_sample, y_sample)
            
            total_samples += self.pop_size * sample_size
            
            # Update sigma based on progress
            best_reward = np.max(rewards)
            avg_reward = np.mean(rewards)
            sigma = self.sigma_scheduler.update(best_reward, avg_reward)
            
            rewards_norm = (rewards - np.mean(rewards)) / (np.std(rewards) + 1e-8)
            best_params += self.lr / (self.pop_size * sigma) * np.dot(noise.T, rewards_norm)
            self.model.set_params(best_params)
            
            train_acc = self.model.evaluate(x_sample, y_sample)
            test_acc = self.model.evaluate(x_test, y_test)
            train_loss = self.model.compute_loss(x_sample, y_sample)
            
            history['train_acc'].append(train_acc)
            history['test_acc'].append(test_acc)
            history['train_loss'].append(train_loss)
            history['time'].append(time.time() - start_time)
            history['samples_seen'].append(total_samples)
            history['sigma'].append(sigma)
            
            if (gen + 1) % 20 == 0:
                print(f"Gen {gen+1}/{generations} - Train: {train_acc:.4f}, "
                      f"Test: {test_acc:.4f}, Sigma: {sigma:.4f}, Time: {history['time'][-1]:.1f}s")
        
        return history

# SGD with Linear Warmup + Cosine Annealing
class SGDWithSchedule:
    def __init__(self, model, max_lr=0.1, min_lr=0.0001, batch_size=128, warmup_epochs=10):
        self.model = model
        self.max_lr = max_lr
        self.min_lr = min_lr
        self.batch_size = batch_size
        self.warmup_epochs = warmup_epochs
    
    def train(self, x_train, y_train, x_test, y_test, epochs=200):
        history = {
            'train_acc': [], 'test_acc': [], 'train_loss': [],
            'time': [], 'samples_seen': [], 'lr': []
        }
        total_samples = 0
        start_time = time.time()
        
        num_batches_per_epoch = len(x_train) // self.batch_size
        total_steps = epochs * num_batches_per_epoch
        warmup_steps = self.warmup_epochs * num_batches_per_epoch
        
        lr_scheduler = LinearWarmupCosineAnnealing(
            max_lr=self.max_lr,
            min_lr=self.min_lr,
            warmup_steps=warmup_steps,
            total_steps=total_steps
        )
        
        global_step = 0
        
        for epoch in range(epochs):
            indices = np.random.permutation(len(x_train))
            x_shuffled = x_train[indices]
            y_shuffled = y_train[indices]
            
            for batch_idx in range(num_batches_per_epoch):
                start_idx = batch_idx * self.batch_size
                end_idx = start_idx + self.batch_size
                x_batch = x_shuffled[start_idx:end_idx]
                y_batch = y_shuffled[start_idx:end_idx]
                
                lr = lr_scheduler.get_lr(global_step)
                self.model.sgd_update(x_batch, y_batch, lr)
                
                total_samples += self.batch_size
                global_step += 1
            
            eval_size = 1000
            eval_idx = np.random.choice(len(x_train), eval_size, replace=False)
            train_acc = self.model.evaluate(x_train[eval_idx], y_train[eval_idx])
            test_acc = self.model.evaluate(x_test, y_test)
            train_loss = self.model.compute_loss(x_train[eval_idx], y_train[eval_idx])
            current_lr = lr_scheduler.get_lr(global_step - 1)
            
            history['train_acc'].append(train_acc)
            history['test_acc'].append(test_acc)
            history['train_loss'].append(train_loss)
            history['time'].append(time.time() - start_time)
            history['samples_seen'].append(total_samples)
            history['lr'].append(current_lr)
            
            if (epoch + 1) % 20 == 0:
                print(f"Epoch {epoch+1}/{epochs} - Train: {train_acc:.4f}, "
                      f"Test: {test_acc:.4f}, LR: {current_lr:.6f}, Time: {history['time'][-1]:.1f}s")
        
        return history

# Hybrid ES+SGD with adaptive schedules (Sequential Strategy)
class HybridESGDAdaptive:
    """
    Hybrid strategy: ES for exploration -> SGD for exploitation
    Key: ES and SGD are applied SEQUENTIALLY, not overlapping
    """
    def __init__(self, model, pop_size=50, initial_es_sigma=0.1, es_lr=0.03,
                 max_sgd_lr=0.01, min_sgd_lr=0.0001, sgd_batch_size=128):
        self.model = model
        self.pop_size = pop_size
        self.es_lr = es_lr
        self.batch_size = sgd_batch_size
        self.param_size = len(model.get_params())
        
        # Adaptive sigma for ES
        self.sigma_scheduler = AdaptiveSigmaScheduler(
            initial_sigma=initial_es_sigma,
            min_sigma=0.001,
            max_sigma=0.2,
            decay_factor=0.995
        )
        
        # Store SGD LR range
        self.max_sgd_lr = max_sgd_lr
        self.min_sgd_lr = min_sgd_lr
    
    def train(self, x_train, y_train, x_test, y_test, generations=100, 
              es_sample_size=1000):
        history = {
            'train_acc': [], 'test_acc': [], 'train_loss': [],
            'best_individual_acc': [], 'time': [], 'samples_seen': [],
            'sigma': [], 'sgd_lr': []
        }
        best_params = self.model.get_params()
        total_samples = 0
        start_time = time.time()
        
        # SGD scheduler for the hybrid approach
        num_batches_per_gen = len(x_train) // self.batch_size
        total_sgd_steps = generations * num_batches_per_gen
        
        sgd_scheduler = LinearWarmupCosineAnnealing(
            max_lr=self.max_sgd_lr,
            min_lr=self.min_sgd_lr,
            warmup_steps=10 * num_batches_per_gen,  # 10 generations warmup
            total_steps=total_sgd_steps
        )
        
        global_sgd_step = 0
        
        for gen in range(generations):
            # ========== PHASE 1: ES Exploration ==========
            idx = np.random.choice(len(x_train), es_sample_size, replace=False)
            x_sample = x_train[idx]
            y_sample = y_train[idx]
            
            sigma = self.sigma_scheduler.sigma
            noise = np.random.randn(self.pop_size, self.param_size)
            rewards = np.zeros(self.pop_size)
            all_params = []
            
            # Evaluate population
            for i in range(self.pop_size):
                params_try = best_params + sigma * noise[i]
                all_params.append(params_try)
                self.model.set_params(params_try)
                rewards[i] = self.model.evaluate(x_sample, y_sample)
            
            total_samples += self.pop_size * es_sample_size
            
            # Find best individual from ES exploration
            best_idx = np.argmax(rewards)
            best_individual_params = all_params[best_idx]
            best_individual_acc = rewards[best_idx]
            history['best_individual_acc'].append(best_individual_acc)
            
            # Update sigma based on ES performance
            avg_reward = np.mean(rewards)
            sigma = self.sigma_scheduler.update(best_individual_acc, avg_reward)
            
            # ========== PHASE 2: SGD Local Refinement ==========
            # Start from the best ES individual
            self.model.set_params(best_individual_params)
            
            # Run SGD for one epoch on the best individual
            indices = np.random.permutation(len(x_train))
            x_shuffled = x_train[indices]
            y_shuffled = y_train[indices]
            
            for batch_idx in range(num_batches_per_gen):
                start_idx = batch_idx * self.batch_size
                end_idx = start_idx + self.batch_size
                x_batch = x_shuffled[start_idx:end_idx]
                y_batch = y_shuffled[start_idx:end_idx]
                
                current_sgd_lr = sgd_scheduler.get_lr(global_sgd_step)
                self.model.sgd_update(x_batch, y_batch, current_sgd_lr)
                
                total_samples += self.batch_size
                global_sgd_step += 1
            
            # SGD-refined parameters become the new baseline for next generation
            best_params = self.model.get_params()
            
            # Evaluate final model (after both ES and SGD)
            eval_idx = np.random.choice(len(x_train), 1000, replace=False)
            train_acc = self.model.evaluate(x_train[eval_idx], y_train[eval_idx])
            test_acc = self.model.evaluate(x_test, y_test)
            train_loss = self.model.compute_loss(x_train[eval_idx], y_train[eval_idx])
            
            history['train_acc'].append(train_acc)
            history['test_acc'].append(test_acc)
            history['train_loss'].append(train_loss)
            history['time'].append(time.time() - start_time)
            history['samples_seen'].append(total_samples)
            history['sigma'].append(sigma)
            history['sgd_lr'].append(current_sgd_lr)
            
            if (gen + 1) % 10 == 0:
                print(f"Gen {gen+1}/{generations} - Best ES: {best_individual_acc:.4f}, "
                      f"After SGD: {train_acc:.4f}, Test: {test_acc:.4f}, "
                      f"Sigma: {sigma:.4f}, SGD_LR: {current_sgd_lr:.6f}, "
                      f"Time: {history['time'][-1]:.1f}s")
        
        return history

# 主實驗
if __name__ == "__main__":
    print("=" * 70)
    print("Evolution Strategy vs SGD - Advanced Comparison with Adaptive Schedules")
    print("=" * 70)
    
    x_train, y_train, x_test, y_test = load_data()
    results = {}
    
    # 實驗1: ES with Adaptive Sigma
    print("\n[1/3] Training ES with Adaptive Sigma...")
    model_es = MLP([784, 128, 64, 10])
    es_optimizer = PureESAdaptive(model_es, population_size=50, 
                                   initial_sigma=0.1, learning_rate=0.03)
    results['ES-Adaptive'] = es_optimizer.train(x_train, y_train, x_test, y_test, 
                                                 generations=200, sample_size=1000)
    
    # 實驗2: SGD with Linear Warmup + Cosine Annealing
    print("\n[2/3] Training SGD with Warmup+Cosine Schedule...")
    model_sgd = MLP([784, 128, 64, 10])
    sgd_optimizer = SGDWithSchedule(model_sgd, max_lr=0.1, min_lr=0.0001, 
                                     batch_size=128, warmup_epochs=10)
    results['SGD-Scheduled'] = sgd_optimizer.train(x_train, y_train, x_test, y_test, 
                                                    epochs=200)
    
    # 實驗3: Hybrid ES+SGD with Adaptive Schedules
    print("\n[3/3] Training Hybrid ES+SGD with Adaptive Schedules...")
    model_hybrid = MLP([784, 128, 64, 10])
    hybrid_optimizer = HybridESGDAdaptive(model_hybrid, pop_size=50, 
                                           initial_es_sigma=0.1, es_lr=0.03,
                                           max_sgd_lr=0.01, min_sgd_lr=0.0001,
                                           sgd_batch_size=128)
    results['Hybrid-Adaptive'] = hybrid_optimizer.train(x_train, y_train, x_test, y_test, 
                                                         generations=100)
    
    # 繪製對比圖
    fig = plt.figure(figsize=(20, 12))
    gs = fig.add_gridspec(3, 4, hspace=0.3, wspace=0.3)
    
    # Row 1: Performance Metrics
    ax1 = fig.add_subplot(gs[0, 0])
    for name, history in results.items():
        iterations = range(1, len(history['test_acc']) + 1)
        ax1.plot(iterations, history['test_acc'], label=name, linewidth=2)
    ax1.set_xlabel('Iterations')
    ax1.set_ylabel('Test Accuracy')
    ax1.set_title('Test Accuracy vs Iterations')
    ax1.legend()
    ax1.grid(True, alpha=0.3)
    
    ax2 = fig.add_subplot(gs[0, 1])
    for name, history in results.items():
        ax2.plot(history['time'], history['test_acc'], label=name, linewidth=2)
    ax2.set_xlabel('Time (seconds)')
    ax2.set_ylabel('Test Accuracy')
    ax2.set_title('Test Accuracy vs Time')
    ax2.legend()
    ax2.grid(True, alpha=0.3)
    
    ax3 = fig.add_subplot(gs[0, 2])
    for name, history in results.items():
        samples_millions = np.array(history['samples_seen']) / 1e6
        ax3.plot(samples_millions, history['test_acc'], label=name, linewidth=2)
    ax3.set_xlabel('Samples Seen (Millions)')
    ax3.set_ylabel('Test Accuracy')
    ax3.set_title('Sample Efficiency')
    ax3.legend()
    ax3.grid(True, alpha=0.3)
    
    ax4 = fig.add_subplot(gs[0, 3])
    for name, history in results.items():
        iterations = range(1, len(history['train_loss']) + 1)
        ax4.plot(iterations, history['train_loss'], label=name, linewidth=2)
    ax4.set_xlabel('Iterations')
    ax4.set_ylabel('Training Loss')
    ax4.set_title('Training Loss')
    ax4.legend()
    ax4.grid(True, alpha=0.3)
    ax4.set_yscale('log')
    
    # Row 2: Adaptive Parameters
    ax5 = fig.add_subplot(gs[1, 0])
    if 'sigma' in results['ES-Adaptive']:
        iterations = range(1, len(results['ES-Adaptive']['sigma']) + 1)
        ax5.plot(iterations, results['ES-Adaptive']['sigma'], 
                label='ES-Adaptive', color='C0', linewidth=2)
    if 'sigma' in results['Hybrid-Adaptive']:
        iterations = range(1, len(results['Hybrid-Adaptive']['sigma']) + 1)
        ax5.plot(iterations, results['Hybrid-Adaptive']['sigma'], 
                label='Hybrid-Adaptive', color='C2', linewidth=2)
    ax5.set_xlabel('Iterations')
    ax5.set_ylabel('Sigma (Noise Level)')
    ax5.set_title('ES Sigma Adaptation')
    ax5.legend()
    ax5.grid(True, alpha=0.3)
    
    ax6 = fig.add_subplot(gs[1, 1])
    if 'lr' in results['SGD-Scheduled']:
        iterations = range(1, len(results['SGD-Scheduled']['lr']) + 1)
        ax6.plot(iterations, results['SGD-Scheduled']['lr'], 
                label='SGD-Scheduled', color='C1', linewidth=2)
    if 'sgd_lr' in results['Hybrid-Adaptive']:
        iterations = range(1, len(results['Hybrid-Adaptive']['sgd_lr']) + 1)
        ax6.plot(iterations, results['Hybrid-Adaptive']['sgd_lr'], 
                label='Hybrid-Adaptive', color='C2', linewidth=2)
    ax6.set_xlabel('Iterations')
    ax6.set_ylabel('Learning Rate')
    ax6.set_title('SGD Learning Rate Schedule')
    ax6.legend()
    ax6.grid(True, alpha=0.3)
    ax6.set_yscale('log')
    
    # Convergence speed
    ax7 = fig.add_subplot(gs[1, 2])
    target_accs = [0.80, 0.85, 0.90, 0.92, 0.94]
    x_pos = np.arange(len(target_accs))
    width = 0.25
    
    for idx, (name, history) in enumerate(results.items()):
        times_to_target = []
        for target in target_accs:
            test_accs = np.array(history['test_acc'])
            times = np.array(history['time'])
            indices = np.where(test_accs >= target)[0]
            if len(indices) > 0:
                times_to_target.append(times[indices[0]])
            else:
                times_to_target.append(times[-1])
        ax7.bar(x_pos + idx * width, times_to_target, width, label=name)
    
    ax7.set_xlabel('Target Accuracy')
    ax7.set_ylabel('Time to Reach (seconds)')
    ax7.set_title('Convergence Speed')
    ax7.set_xticks(x_pos + width)
    ax7.set_xticklabels([f'{acc:.0%}' for acc in target_accs])
    ax7.legend()
    ax7.grid(True, alpha=0.3, axis='y')
    
    # Final performance comparison
    ax8 = fig.add_subplot(gs[1, 3])
    methods = list(results.keys())
    final_test_acc = [results[m]['test_acc'][-1] for m in methods]
    final_time = [results[m]['time'][-1] for m in methods]
    final_samples = [results[m]['samples_seen'][-1] / 1e6 for m in methods]
    
    x_pos = np.arange(len(methods))
    width = 0.25
    ax8.bar(x_pos - width, final_test_acc, width, label='Test Acc', alpha=0.8)
    ax8.bar(x_pos, [t/max(final_time) for t in final_time], width, 
           label='Rel. Time', alpha=0.8)
    ax8.bar(x_pos + width, [s/max(final_samples) for s in final_samples], width,
           label='Rel. Samples', alpha=0.8)
    
    ax8.set_ylabel('Value')
    ax8.set_title('Final Performance')
    ax8.set_xticks(x_pos)
    ax8.set_xticklabels(methods, rotation=15, ha='right')
    ax8.legend()
    ax8.grid(True, alpha=0.3, axis='y')
    
    # Row 3: Training dynamics
    ax9 = fig.add_subplot(gs[2, :2])
    for name, history in results.items():
        iterations = range(1, len(history['test_acc']) + 1)
        ax9.plot(iterations, history['test_acc'], label=f'{name} (Test)', linewidth=2)
        ax9.plot(iterations, history['train_acc'], label=f'{name} (Train)', 
                linewidth=1.5, linestyle='--', alpha=0.7)
    ax9.set_xlabel('Iterations')
    ax9.set_ylabel('Accuracy')
    ax9.set_title('Train vs Test Accuracy')
    ax9.legend(ncol=2, fontsize=8)
    ax9.grid(True, alpha=0.3)
    
    # Efficiency metrics
    ax10 = fig.add_subplot(gs[2, 2:])
    metrics_names = ['Final Acc', 'Time Efficiency\n(acc/min)', 'Sample Efficiency\n(acc/M samples)']
    
    for idx, name in enumerate(methods):
        history = results[name]
        final_acc = history['test_acc'][-1]
        time_eff = final_acc / (history['time'][-1] / 60)
        sample_eff = final_acc / (history['samples_seen'][-1] / 1e6)
        
        values = [final_acc, time_eff * 10, sample_eff]  # Scale for visibility
        x_pos = np.arange(len(metrics_names))
        ax10.bar(x_pos + idx * 0.25, values, 0.25, label=name, alpha=0.8)

In [None]:
import numpy as np
from tensorflow import keras
from tensorflow.keras.datasets import mnist
import matplotlib.pyplot as plt
import time
import json

# 載入MNIST數據
def load_data():
    (x_train, y_train), (x_test, y_test) = mnist.load_data()
    x_train = x_train.reshape(-1, 784).astype('float32') / 255.0
    x_test = x_test.reshape(-1, 784).astype('float32') / 255.0
    y_train = keras.utils.to_categorical(y_train, 10)
    y_test = keras.utils.to_categorical(y_test, 10)
    return x_train, y_train, x_test, y_test

# MLP基礎類別
class MLP:
    def __init__(self, layers):
        self.layers = layers
        self.weights = []
        self.biases = []
        for i in range(len(layers) - 1):
            w = np.random.randn(layers[i], layers[i+1]) * np.sqrt(2.0 / layers[i])
            b = np.zeros((1, layers[i+1]))
            self.weights.append(w)
            self.biases.append(b)
    
    def relu(self, x):
        return np.maximum(0, x)
    
    def softmax(self, x):
        exp_x = np.exp(x - np.max(x, axis=1, keepdims=True))
        return exp_x / np.sum(exp_x, axis=1, keepdims=True)
    
    def forward(self, x):
        a = x
        for i in range(len(self.weights) - 1):
            z = np.dot(a, self.weights[i]) + self.biases[i]
            a = self.relu(z)
        z = np.dot(a, self.weights[-1]) + self.biases[-1]
        a = self.softmax(z)
        return a
    
    def get_params(self):
        params = []
        for w, b in zip(self.weights, self.biases):
            params.append(w.flatten())
            params.append(b.flatten())
        return np.concatenate(params)
    
    def set_params(self, params):
        idx = 0
        for i in range(len(self.weights)):
            w_shape = self.weights[i].shape
            b_shape = self.biases[i].shape
            w_size = np.prod(w_shape)
            b_size = np.prod(b_shape)
            self.weights[i] = params[idx:idx+w_size].reshape(w_shape)
            idx += w_size
            self.biases[i] = params[idx:idx+b_size].reshape(b_shape)
            idx += b_size
    
    def evaluate(self, x, y):
        pred = self.forward(x)
        acc = np.mean(np.argmax(pred, axis=1) == np.argmax(y, axis=1))
        return acc
    
    def compute_loss(self, x, y):
        pred = self.forward(x)
        pred = np.clip(pred, 1e-10, 1 - 1e-10)
        loss = -np.mean(np.sum(y * np.log(pred), axis=1))
        return loss
    
    def backward(self, x, y):
        m = x.shape[0]
        activations = [x]
        z_values = []
        
        a = x
        for i in range(len(self.weights) - 1):
            z = np.dot(a, self.weights[i]) + self.biases[i]
            z_values.append(z)
            a = self.relu(z)
            activations.append(a)
        
        z = np.dot(a, self.weights[-1]) + self.biases[-1]
        z_values.append(z)
        output = self.softmax(z)
        
        dz = output - y
        gradients_w = []
        gradients_b = []
        
        for i in range(len(self.weights) - 1, -1, -1):
            dw = np.dot(activations[i].T, dz) / m
            db = np.sum(dz, axis=0, keepdims=True) / m
            gradients_w.insert(0, dw)
            gradients_b.insert(0, db)
            
            if i > 0:
                dz = np.dot(dz, self.weights[i].T)
                dz[z_values[i-1] <= 0] = 0
        
        return gradients_w, gradients_b
    
    def sgd_update(self, x, y, learning_rate=0.01):
        grad_w, grad_b = self.backward(x, y)
        for i in range(len(self.weights)):
            self.weights[i] -= learning_rate * grad_w[i]
            self.biases[i] -= learning_rate * grad_b[i]

# Advanced Hybrid ES-SGD based on OpenAI-ES insights
class AdvancedHybridOptimizer:
    """
    基於OpenAI-ES論文的進階混合優化器
    
    關鍵特性：
    1. Limited Perturbations: 只擾動部分參數（大幅加速）
    2. Rank-based Fitness Shaping: 使用排名而非原始獎勵
    3. Interleaved ES-SGD: 交錯使用ES探索和SGD精煉
    4. Adaptive Noise & LR: 動態調整探索範圍和學習率
    """
    
    def __init__(self, model, 
                 pop_size=50,
                 initial_sigma=0.1,
                 es_lr=0.03,
                 max_sgd_lr=0.01,
                 min_sgd_lr=0.0001,
                 sgd_batch_size=128,
                 perturb_fraction=0.5):  # 擾動參數的比例
        self.model = model
        self.pop_size = pop_size
        self.es_lr = es_lr
        self.batch_size = sgd_batch_size
        self.param_size = len(model.get_params())
        
        # Limited Perturbations: 只擾動部分參數
        self.perturb_size = int(self.param_size * perturb_fraction)
        print(f"Limited Perturbations: {self.perturb_size}/{self.param_size} "
              f"({perturb_fraction*100:.0f}%) parameters will be perturbed")
        
        # Adaptive schedulers
        self.sigma = initial_sigma
        self.min_sigma = 0.001
        self.max_sigma = 0.3
        self.sigma_decay = 0.995
        
        self.max_sgd_lr = max_sgd_lr
        self.min_sgd_lr = min_sgd_lr
        
        # Track performance for adaptive adjustment
        self.prev_best_fitness = None
        self.stagnation_count = 0
    
    def rank_fitness_shaping(self, rewards):
        """Rank-based fitness shaping (更穩定的獎勵信號)"""
        ranks = np.argsort(np.argsort(rewards))  # 獲得排名
        # 使用utility函數轉換排名
        utilities = np.maximum(0, np.log(self.pop_size/2 + 1) - np.log(ranks + 1))
        utilities = utilities / np.sum(utilities) - 1.0 / self.pop_size
        return utilities
    
    def limited_perturbation_noise(self):
        """
        Limited Perturbations: 只擾動部分參數
        基於OpenAI-ES論文，可獲得60+倍加速
        """
        # 為每個個體隨機選擇要擾動的參數索引
        perturbations = []
        indices_list = []
        
        for _ in range(self.pop_size):
            # 隨機選擇要擾動的參數索引
            indices = np.random.choice(self.param_size, self.perturb_size, replace=False)
            indices_list.append(indices)
            
            # 創建稀疏擾動向量
            noise = np.zeros(self.param_size)
            noise[indices] = np.random.randn(self.perturb_size)
            perturbations.append(noise)
        
        return np.array(perturbations), indices_list
    
    def update_sigma(self, best_fitness):
        """基於性能動態調整sigma"""
        if self.prev_best_fitness is not None:
            improvement = best_fitness - self.prev_best_fitness
            
            if improvement > 0.01:
                # 顯著進步：減小探索範圍
                self.sigma *= self.sigma_decay
                self.stagnation_count = 0
            elif improvement < 0.001:
                # 停滯：增加探索
                self.stagnation_count += 1
                if self.stagnation_count > 5:
                    self.sigma *= 1.05
                    self.stagnation_count = 0
        
        self.prev_best_fitness = best_fitness
        self.sigma = np.clip(self.sigma, self.min_sigma, self.max_sigma)
    
    def train(self, x_train, y_train, x_test, y_test, 
              generations=100,
              es_sample_size=1000,
              sgd_frequency=1):  # 每N代做一次SGD
        """
        交錯式ES-SGD訓練
        sgd_frequency: 控制ES和SGD的交錯頻率
        """
        history = {
            'train_acc': [], 'test_acc': [], 'train_loss': [],
            'best_individual_acc': [], 'time': [], 'samples_seen': [],
            'sigma': [], 'sgd_lr': [], 'es_gradient_norm': []
        }
        
        best_params = self.model.get_params()
        total_samples = 0
        start_time = time.time()
        
        # SGD scheduler
        num_batches_per_gen = len(x_train) // self.batch_size
        total_sgd_steps = (generations // sgd_frequency) * num_batches_per_gen
        global_sgd_step = 0
        
        for gen in range(generations):
            # ========== ES Phase ==========
            idx = np.random.choice(len(x_train), es_sample_size, replace=False)
            x_sample = x_train[idx]
            y_sample = y_train[idx]
            
            # Limited Perturbations
            noise, indices_list = self.limited_perturbation_noise()
            rewards = np.zeros(self.pop_size)
            all_params = []
            
            # 評估擾動族群
            for i in range(self.pop_size):
                params_try = best_params + self.sigma * noise[i]
                all_params.append(params_try)
                self.model.set_params(params_try)
                rewards[i] = self.model.evaluate(x_sample, y_sample)
            
            total_samples += self.pop_size * es_sample_size
            
            # Rank-based fitness shaping
            utilities = self.rank_fitness_shaping(rewards)
            
            # 計算ES梯度估計
            es_gradient = np.dot(noise.T, utilities) / self.sigma
            es_gradient_norm = np.linalg.norm(es_gradient)
            
            # 更新參數
            best_params += self.es_lr * es_gradient
            
            # 找到最佳個體
            best_idx = np.argmax(rewards)
            best_individual_acc = rewards[best_idx]
            
            # 更新sigma
            self.update_sigma(best_individual_acc)
            
            # ========== SGD Phase (交錯執行) ==========
            if (gen + 1) % sgd_frequency == 0:
                # 從ES優化後的參數開始SGD
                self.model.set_params(best_params)
                
                # 計算當前SGD學習率
                progress = global_sgd_step / max(total_sgd_steps, 1)
                current_sgd_lr = self.min_sgd_lr + (self.max_sgd_lr - self.min_sgd_lr) * \
                                 0.5 * (1 + np.cos(np.pi * progress))
                
                # Mini-batch SGD
                indices = np.random.permutation(len(x_train))
                x_shuffled = x_train[indices]
                y_shuffled = y_train[indices]
                
                for batch_idx in range(num_batches_per_gen):
                    start_idx = batch_idx * self.batch_size
                    end_idx = start_idx + self.batch_size
                    x_batch = x_shuffled[start_idx:end_idx]
                    y_batch = y_shuffled[start_idx:end_idx]
                    
                    self.model.sgd_update(x_batch, y_batch, current_sgd_lr)
                    total_samples += self.batch_size
                    global_sgd_step += 1
                
                # SGD後的參數成為新基準
                best_params = self.model.get_params()
            else:
                current_sgd_lr = 0  # 這一代沒有SGD
            
            # 評估
            self.model.set_params(best_params)
            eval_idx = np.random.choice(len(x_train), 1000, replace=False)
            train_acc = self.model.evaluate(x_train[eval_idx], y_train[eval_idx])
            test_acc = self.model.evaluate(x_test, y_test)
            train_loss = self.model.compute_loss(x_train[eval_idx], y_train[eval_idx])
            
            history['train_acc'].append(train_acc)
            history['test_acc'].append(test_acc)
            history['train_loss'].append(train_loss)
            history['best_individual_acc'].append(best_individual_acc)
            history['time'].append(time.time() - start_time)
            history['samples_seen'].append(total_samples)
            history['sigma'].append(self.sigma)
            history['sgd_lr'].append(current_sgd_lr)
            history['es_gradient_norm'].append(es_gradient_norm)
            
            if (gen + 1) % 10 == 0:
                sgd_status = f"SGD_LR: {current_sgd_lr:.6f}" if current_sgd_lr > 0 else "No SGD"
                print(f"Gen {gen+1}/{generations} - Best ES: {best_individual_acc:.4f}, "
                      f"Test: {test_acc:.4f}, Sigma: {self.sigma:.4f}, {sgd_status}, "
                      f"ES_grad_norm: {es_gradient_norm:.2f}, Time: {history['time'][-1]:.1f}s")
        
        return history

# 主實驗
if __name__ == "__main__":
    print("=" * 70)
    print("Advanced Hybrid ES-SGD Optimizer (Based on OpenAI-ES Insights)")
    print("=" * 70)
    
    x_train, y_train, x_test, y_test = load_data()
    
    # 測試不同的配置
    configs = {
        'Limited-Pert-50%': {
            'perturb_fraction': 0.5,
            'sgd_frequency': 1,
            'description': 'Limited Perturbations (50% params) + SGD每代'
        },
        'Limited-Pert-25%': {
            'perturb_fraction': 0.25,
            'sgd_frequency': 1,
            'description': 'Limited Perturbations (25% params) + SGD每代'
        },
        'Interleaved-ES-SGD': {
            'perturb_fraction': 0.5,
            'sgd_frequency': 3,
            'description': 'Limited Pert (50%) + SGD每3代'
        },
        'Pure-ES-Limited': {
            'perturb_fraction': 0.5,
            'sgd_frequency': 999,  # 實質上不做SGD
            'description': 'Pure ES with Limited Perturbations'
        }
    }
    
    results = {}
    
    for name, config in configs.items():
        print(f"\n{'='*70}")
        print(f"Training: {name}")
        print(f"Config: {config['description']}")
        print(f"{'='*70}")
        
        model = MLP([784, 128, 64, 10])
        optimizer = AdvancedHybridOptimizer(
            model=model,
            pop_size=50,
            initial_sigma=0.1,
            es_lr=0.03,
            max_sgd_lr=0.01,
            min_sgd_lr=0.0001,
            sgd_batch_size=128,
            perturb_fraction=config['perturb_fraction']
        )
        
        results[name] = optimizer.train(
            x_train, y_train, x_test, y_test,
            generations=100,
            es_sample_size=1000,
            sgd_frequency=config['sgd_frequency']
        )
    
    # 可視化比較
    fig = plt.figure(figsize=(20, 12))
    gs = fig.add_gridspec(3, 3, hspace=0.3, wspace=0.3)
    
    # Test Accuracy
    ax1 = fig.add_subplot(gs[0, 0])
    for name, history in results.items():
        ax1.plot(history['test_acc'], label=name, linewidth=2)
    ax1.set_xlabel('Generation')
    ax1.set_ylabel('Test Accuracy')
    ax1.set_title('Test Accuracy Comparison')
    ax1.legend()
    ax1.grid(True, alpha=0.3)
    
    # Time Efficiency
    ax2 = fig.add_subplot(gs[0, 1])
    for name, history in results.items():
        ax2.plot(history['time'], history['test_acc'], label=name, linewidth=2)
    ax2.set_xlabel('Time (seconds)')
    ax2.set_ylabel('Test Accuracy')
    ax2.set_title('Time Efficiency')
    ax2.legend()
    ax2.grid(True, alpha=0.3)
    
    # Sample Efficiency
    ax3 = fig.add_subplot(gs[0, 2])
    for name, history in results.items():
        samples_m = np.array(history['samples_seen']) / 1e6
        ax3.plot(samples_m, history['test_acc'], label=name, linewidth=2)
    ax3.set_xlabel('Samples (Millions)')
    ax3.set_ylabel('Test Accuracy')
    ax3.set_title('Sample Efficiency')
    ax3.legend()
    ax3.grid(True, alpha=0.3)
    
    # Sigma Evolution
    ax4 = fig.add_subplot(gs[1, 0])
    for name, history in results.items():
        ax4.plot(history['sigma'], label=name, linewidth=2)
    ax4.set_xlabel('Generation')
    ax4.set_ylabel('Sigma')
    ax4.set_title('Adaptive Sigma Evolution')
    ax4.legend()
    ax4.grid(True, alpha=0.3)
    
    # ES Gradient Norm
    ax5 = fig.add_subplot(gs[1, 1])
    for name, history in results.items():
        ax5.plot(history['es_gradient_norm'], label=name, linewidth=2, alpha=0.7)
    ax5.set_xlabel('Generation')
    ax5.set_ylabel('ES Gradient Norm')
    ax5.set_title('ES Gradient Magnitude')
    ax5.legend()
    ax5.grid(True, alpha=0.3)
    ax5.set_yscale('log')
    
    # Training Loss
    ax6 = fig.add_subplot(gs[1, 2])
    for name, history in results.items():
        ax6.plot(history['train_loss'], label=name, linewidth=2)
    ax6.set_xlabel('Generation')
    ax6.set_ylabel('Training Loss')
    ax6.set_title('Training Loss')
    ax6.legend()
    ax6.grid(True, alpha=0.3)
    ax6.set_yscale('log')
    
    # Best Individual vs Final
    ax7 = fig.add_subplot(gs[2, 0])
    for name, history in results.items():
        ax7.plot(history['best_individual_acc'], label=f'{name} (ES)', 
                linestyle='--', alpha=0.7)
        ax7.plot(history['test_acc'], label=f'{name} (Final)', linewidth=2)
    ax7.set_xlabel('Generation')
    ax7.set_ylabel('Accuracy')
    ax7.set_title('ES Best vs Final Accuracy')
    ax7.legend(fontsize=8)
    ax7.grid(True, alpha=0.3)
    
    # Final Performance Bar Chart
    ax8 = fig.add_subplot(gs[2, 1:])
    methods = list(results.keys())
    final_acc = [results[m]['test_acc'][-1] for m in methods]
    final_time = [results[m]['time'][-1] for m in methods]
    final_samples = [results[m]['samples_seen'][-1] / 1e6 for m in methods]
    
    x = np.arange(len(methods))
    width = 0.25
    
    ax8.bar(x - width, final_acc, width, label='Final Accuracy', alpha=0.8)
    ax8.bar(x, [t/max(final_time) for t in final_time], width, 
           label='Relative Time', alpha=0.8)
    ax8.bar(x + width, [s/max(final_samples) for s in final_samples], width,
           label='Relative Samples', alpha=0.8)
    
    ax8.set_ylabel('Value')
    ax8.set_title('Final Performance Metrics')
    ax8.set_xticks(x)
    ax8.set_xticklabels(methods, rotation=20, ha='right')
    ax8.legend()
    ax8.grid(True, alpha=0.3, axis='y')
    
    plt.savefig('advanced_hybrid_comparison.png', dpi=150, bbox_inches='tight')
    plt.show()
    
    # 打印結果摘要
    print("\n" + "=" * 70)
    print("RESULTS SUMMARY")
    print("=" * 70)
    for name, history in results.items():
        print(f"\n{name}:")
        print(f"  Final Test Acc: {history['test_acc'][-1]:.4f}")
        print(f"  Best Test Acc: {max(history['test_acc']):.4f}")
        print(f"  Total Time: {history['time'][-1]:.1f}s")
        print(f"  Total Samples: {history['samples_seen'][-1]/1e6:.2f}M")
        print(f"  Sample Efficiency: {history['test_acc'][-1]/(history['samples_seen'][-1]/1e6):.4f} acc/M")
        print(f"  Time Efficiency: {history['test_acc'][-1]/(history['time'][-1]/60):.4f} acc/min")
        print(f"  Final Sigma: {history['sigma'][-1]:.6f}")
    
    # 保存結果
    summary = {
        name: {
            'config': configs[name]['description'],
            'final_test_acc': float(history['test_acc'][-1]),
            'best_test_acc': float(max(history['test_acc'])),
            'total_time': float(history['time'][-1]),
            'total_samples': int(history['samples_seen'][-1])
        }
        for name, history in results.items()
    }
    
    with open('advanced_hybrid_results.json', 'w') as f:
        json.dump(summary, f, indent=2)
    
    print("\nResults saved: advanced_hybrid_comparison.png and advanced_hybrid_results.json")

In [None]:
import numpy as np
from tensorflow import keras
from tensorflow.keras.datasets import mnist
import matplotlib.pyplot as plt
import time

def load_data():
    (x_train, y_train), (x_test, y_test) = mnist.load_data()
    x_train = x_train.reshape(-1, 784).astype('float32') / 255.0
    x_test = x_test.reshape(-1, 784).astype('float32') / 255.0
    y_train = keras.utils.to_categorical(y_train, 10)
    y_test = keras.utils.to_categorical(y_test, 10)
    return x_train, y_train, x_test, y_test

class MLP:
    def __init__(self, layers):
        self.layers = layers
        self.weights = []
        self.biases = []
        for i in range(len(layers) - 1):
            w = np.random.randn(layers[i], layers[i+1]) * np.sqrt(2.0 / layers[i])
            b = np.zeros((1, layers[i+1]))
            self.weights.append(w)
            self.biases.append(b)
    
    def relu(self, x):
        return np.maximum(0, x)
    
    def softmax(self, x):
        exp_x = np.exp(x - np.max(x, axis=1, keepdims=True))
        return exp_x / np.sum(exp_x, axis=1, keepdims=True)
    
    def forward(self, x):
        a = x
        for i in range(len(self.weights) - 1):
            z = np.dot(a, self.weights[i]) + self.biases[i]
            a = self.relu(z)
        z = np.dot(a, self.weights[-1]) + self.biases[-1]
        return self.softmax(z)
    
    def get_params(self):
        params = []
        for w, b in zip(self.weights, self.biases):
            params.append(w.flatten())
            params.append(b.flatten())
        return np.concatenate(params)
    
    def set_params(self, params):
        idx = 0
        for i in range(len(self.weights)):
            w_shape, b_shape = self.weights[i].shape, self.biases[i].shape
            w_size, b_size = np.prod(w_shape), np.prod(b_shape)
            self.weights[i] = params[idx:idx+w_size].reshape(w_shape)
            idx += w_size
            self.biases[i] = params[idx:idx+b_size].reshape(b_shape)
            idx += b_size
    
    def evaluate(self, x, y):
        pred = self.forward(x)
        return np.mean(np.argmax(pred, axis=1) == np.argmax(y, axis=1))
    
    def compute_loss(self, x, y):
        pred = self.forward(x)
        pred = np.clip(pred, 1e-10, 1 - 1e-10)
        return -np.mean(np.sum(y * np.log(pred), axis=1))
    
    def backward(self, x, y):
        m = x.shape[0]
        activations = [x]
        z_values = []
        
        a = x
        for i in range(len(self.weights) - 1):
            z = np.dot(a, self.weights[i]) + self.biases[i]
            z_values.append(z)
            a = self.relu(z)
            activations.append(a)
        
        z = np.dot(a, self.weights[-1]) + self.biases[-1]
        z_values.append(z)
        output = self.softmax(z)
        
        dz = output - y
        gradients_w, gradients_b = [], []
        
        for i in range(len(self.weights) - 1, -1, -1):
            dw = np.dot(activations[i].T, dz) / m
            db = np.sum(dz, axis=0, keepdims=True) / m
            gradients_w.insert(0, dw)
            gradients_b.insert(0, db)
            
            if i > 0:
                dz = np.dot(dz, self.weights[i].T)
                dz[z_values[i-1] <= 0] = 0
        
        return gradients_w, gradients_b
    
    def sgd_update(self, x, y, learning_rate=0.01):
        grad_w, grad_b = self.backward(x, y)
        for i in range(len(self.weights)):
            self.weights[i] -= learning_rate * grad_w[i]
            self.biases[i] -= learning_rate * grad_b[i]
    
    def copy(self):
        """Create a deep copy of the model"""
        new_model = MLP(self.layers)
        new_model.set_params(self.get_params().copy())
        return new_model

class BioInspiredHybridOptimizer:
    """
    生物啟發的混合優化器：
    
    模擬自然演化：
    1. 每一代（Generation）生成多個個體（多尺度變異）
    2. 每個個體在其"生命週期"內學習（SGD優化）
    3. 選擇最佳個體的"基因"（參數）傳遞給下一代
    
    多尺度變異（Multi-Scale Mutation）：
    - 同一代中包含不同幅度的變異（小、中、大）
    - 自適應調整變異範圍
    """
    
    def __init__(self, model,
                 population_size=15,  # 較小族群，每個個體都會被優化
                 num_sigma_levels=3,   # 多尺度變異層級
                 initial_sigma=0.05,   # 初始變異範圍（較小）
                 max_sgd_lr=0.01,
                 min_sgd_lr=0.001,
                 sgd_steps_per_life=200,  # 每個個體的"生命長度"
                 batch_size=128):
        
        self.base_model = model
        self.pop_size = population_size
        self.num_sigma_levels = num_sigma_levels
        self.sigma = initial_sigma
        self.max_sgd_lr = max_sgd_lr
        self.min_sgd_lr = min_sgd_lr
        self.sgd_steps = sgd_steps_per_life
        self.batch_size = batch_size
        self.param_size = len(model.get_params())
        
        # Evolution path for CMA-like adaptation
        self.evolution_path = np.zeros(self.param_size)
        self.path_decay = 0.8
        
        print(f"\n生物啟發混合優化器配置:")
        print(f"  族群大小: {self.pop_size}")
        print(f"  變異層級: {self.num_sigma_levels} (小/中/大突變)")
        print(f"  每個體生命週期: {self.sgd_steps} SGD步驟")
        print(f"  初始變異範圍: {self.sigma}")
    
    def generate_multi_scale_population(self, parent_params):
        """
        生成多尺度變異的族群
        每一代包含不同範圍的突變，模擬自然界的多樣性
        """
        population = []
        sigma_used = []
        
        # 計算不同尺度的sigma
        sigma_levels = [
            self.sigma * 0.3,   # 小突變（微調）
            self.sigma * 1.0,   # 中等突變（探索）
            self.sigma * 3.0    # 大突變（跳出局部最優）
        ]
        
        individuals_per_level = self.pop_size // self.num_sigma_levels
        
        for level_idx, sigma_level in enumerate(sigma_levels):
            for _ in range(individuals_per_level):
                # 生成擾動
                noise = np.random.randn(self.param_size)
                offspring_params = parent_params + sigma_level * noise
                
                population.append(offspring_params)
                sigma_used.append(sigma_level)
        
        # 填充剩餘個體（如果pop_size不能被level數整除）
        while len(population) < self.pop_size:
            noise = np.random.randn(self.param_size)
            offspring_params = parent_params + self.sigma * noise
            population.append(offspring_params)
            sigma_used.append(self.sigma)
        
        return population, sigma_used
    
    def live_and_learn(self, individual_params, x_train, y_train, gen_progress):
        """
        個體的"生命週期"：在其基因基礎上通過經驗學習（SGD）
        
        類比：
        - individual_params: 遺傳的基因
        - SGD: 個體一生的學習和適應
        - 返回值: 個體達到的最佳狀態
        """
        # 創建個體模型
        individual = self.base_model.copy()
        individual.set_params(individual_params)
        
        # 學習率衰減（模擬從年輕到年老的學習能力）
        lr_schedule = lambda step: self.min_sgd_lr + \
            (self.max_sgd_lr - self.min_sgd_lr) * (1 - step / self.sgd_steps)
        
        # 在生命週期內學習
        best_fitness = 0
        best_params = individual_params.copy()
        
        for step in range(self.sgd_steps):
            # 隨機採樣訓練數據
            idx = np.random.choice(len(x_train), self.batch_size, replace=False)
            x_batch = x_train[idx]
            y_batch = y_train[idx]
            
            # 學習（SGD更新）
            current_lr = lr_schedule(step)
            individual.sgd_update(x_batch, y_batch, current_lr)
            
            # 定期評估（不是每步都評估，節省時間）
            if step % 50 == 0 or step == self.sgd_steps - 1:
                fitness = individual.evaluate(x_batch, y_batch)
                if fitness > best_fitness:
                    best_fitness = fitness
                    best_params = individual.get_params().copy()
        
        return best_params, best_fitness
    
    def update_evolution_path(self, selected_direction):
        """更新演化路徑（類似CMA-ES）"""
        self.evolution_path = self.path_decay * self.evolution_path + \
                              (1 - self.path_decay) * selected_direction
    
    def adapt_sigma(self, fitness_improvements):
        """
        根據族群的適應度改進情況調整變異範圍
        如果改進良好→減小sigma（利用）
        如果停滯→增大sigma（探索）
        """
        avg_improvement = np.mean(fitness_improvements)
        
        if avg_improvement > 0.02:  # 顯著進步
            self.sigma *= 0.95
        elif avg_improvement < 0.005:  # 停滯
            self.sigma *= 1.05
        
        # 限制範圍
        self.sigma = np.clip(self.sigma, 0.001, 0.3)
    
    def train(self, x_train, y_train, x_test, y_test, generations=50):
        """
        主訓練循環：模擬多代演化
        """
        history = {
            'train_acc': [], 'test_acc': [], 'train_loss': [],
            'best_individual_acc': [], 'time': [], 'samples_seen': [],
            'sigma': [], 'population_diversity': []
        }
        
        # 初始"祖先"
        best_parent_params = self.base_model.get_params()
        prev_best_fitness = 0
        
        total_samples = 0
        start_time = time.time()
        
        print(f"\n{'='*70}")
        print(f"開始演化訓練 ({generations} 代)")
        print(f"{'='*70}\n")
        
        for gen in range(generations):
            gen_start = time.time()
            gen_progress = gen / generations
            
            # 1. 生成多尺度變異的後代
            population, sigma_used = self.generate_multi_scale_population(best_parent_params)
            
            # 2. 每個個體"活著並學習"
            print(f"Generation {gen+1}/{generations} - 個體學習中...", end='', flush=True)
            
            fitness_list = []
            learned_population = []
            initial_fitness = []
            
            for idx, individual_params in enumerate(population):
                # 記錄初始適應度（遺傳的基因）
                temp_model = self.base_model.copy()
                temp_model.set_params(individual_params)
                eval_idx = np.random.choice(len(x_train), 500, replace=False)
                init_fit = temp_model.evaluate(x_train[eval_idx], y_train[eval_idx])
                initial_fitness.append(init_fit)
                
                # 個體學習
                learned_params, final_fitness = self.live_and_learn(
                    individual_params, x_train, y_train, gen_progress
                )
                
                learned_population.append(learned_params)
                fitness_list.append(final_fitness)
                total_samples += self.sgd_steps * self.batch_size
                
                if (idx + 1) % 5 == 0:
                    print(f".", end='', flush=True)
            
            print(" 完成")
            
            # 3. 選擇（自然選擇：最適者生存）
            best_idx = np.argmax(fitness_list)
            best_offspring_params = learned_population[best_idx]
            best_fitness = fitness_list[best_idx]
            best_individual_acc = fitness_list[best_idx]
            
            # 計算學習帶來的提升
            fitness_improvements = [fitness_list[i] - initial_fitness[i] 
                                   for i in range(len(fitness_list))]
            avg_improvement = np.mean(fitness_improvements)
            
            # 4. 更新演化路徑
            direction = best_offspring_params - best_parent_params
            self.update_evolution_path(direction)
            
            # 5. 自適應調整sigma
            self.adapt_sigma(fitness_improvements)
            
            # 6. 最佳個體成為下一代的"父母"
            best_parent_params = best_offspring_params
            self.base_model.set_params(best_parent_params)
            
            # 評估
            eval_idx = np.random.choice(len(x_train), 1000, replace=False)
            train_acc = self.base_model.evaluate(x_train[eval_idx], y_train[eval_idx])
            test_acc = self.base_model.evaluate(x_test, y_test)
            train_loss = self.base_model.compute_loss(x_train[eval_idx], y_train[eval_idx])
            
            # 計算族群多樣性
            population_std = np.std([np.linalg.norm(p - best_parent_params) 
                                    for p in learned_population])
            
            # 記錄
            history['train_acc'].append(train_acc)
            history['test_acc'].append(test_acc)
            history['train_loss'].append(train_loss)
            history['best_individual_acc'].append(best_individual_acc)
            history['time'].append(time.time() - start_time)
            history['samples_seen'].append(total_samples)
            history['sigma'].append(self.sigma)
            history['population_diversity'].append(population_std)
            
            gen_time = time.time() - gen_start
            
            print(f"  最佳個體: {best_individual_acc:.4f} | "
                  f"測試準確率: {test_acc:.4f} | "
                  f"平均學習提升: {avg_improvement:.4f} | "
                  f"Sigma: {self.sigma:.4f} | "
                  f"代時間: {gen_time:.1f}s\n")
            
            prev_best_fitness = best_fitness
        
        return history

if __name__ == "__main__":
    print("="*70)
    print("生物啟發混合優化器：多尺度演化 + 個體學習")
    print("="*70)
    
    x_train, y_train, x_test, y_test = load_data()
    
    # 測試不同配置
    configs = {
        'Bio-Hybrid-Fast': {
            'pop_size': 10,
            'sgd_steps': 150,
            'generations': 50,
            'desc': '小族群，快速學習'
        },
        'Bio-Hybrid-Balanced': {
            'pop_size': 15,
            'sgd_steps': 200,
            'generations': 40,
            'desc': '平衡配置'
        },
        'Bio-Hybrid-Deep': {
            'pop_size': 20,
            'sgd_steps': 250,
            'generations': 30,
            'desc': '大族群，深度學習'
        }
    }
    
    results = {}
    
    for name, config in configs.items():
        print(f"\n{'='*70}")
        print(f"測試配置: {name}")
        print(f"說明: {config['desc']}")
        print(f"{'='*70}")
        
        model = MLP([784, 128, 64, 10])
        optimizer = BioInspiredHybridOptimizer(
            model=model,
            population_size=config['pop_size'],
            num_sigma_levels=3,
            initial_sigma=0.05,
            max_sgd_lr=0.01,
            min_sgd_lr=0.001,
            sgd_steps_per_life=config['sgd_steps'],
            batch_size=128
        )
        
        results[name] = optimizer.train(
            x_train, y_train, x_test, y_test,
            generations=config['generations']
        )
    
    # 可視化
    fig = plt.figure(figsize=(20, 12))
    gs = fig.add_gridspec(3, 3, hspace=0.3, wspace=0.3)
    
    # Test Accuracy
    ax1 = fig.add_subplot(gs[0, 0])
    for name, history in results.items():
        ax1.plot(history['test_acc'], label=name, linewidth=2, marker='o', markersize=3)
    ax1.set_xlabel('Generation')
    ax1.set_ylabel('Test Accuracy')
    ax1.set_title('Test Accuracy vs Generation')
    ax1.legend()
    ax1.grid(True, alpha=0.3)
    
    # Time Efficiency
    ax2 = fig.add_subplot(gs[0, 1])
    for name, history in results.items():
        ax2.plot(history['time'], history['test_acc'], label=name, linewidth=2)
    ax2.set_xlabel('Time (seconds)')
    ax2.set_ylabel('Test Accuracy')
    ax2.set_title('Learning Curve (Time)')
    ax2.legend()
    ax2.grid(True, alpha=0.3)
    
    # Sample Efficiency
    ax3 = fig.add_subplot(gs[0, 2])
    for name, history in results.items():
        samples_m = np.array(history['samples_seen']) / 1e6
        ax3.plot(samples_m, history['test_acc'], label=name, linewidth=2)
    ax3.set_xlabel('Samples Seen (Millions)')
    ax3.set_ylabel('Test Accuracy')
    ax3.set_title('Sample Efficiency')
    ax3.legend()
    ax3.grid(True, alpha=0.3)
    
    # Sigma Evolution (Multi-scale adaptation)
    ax4 = fig.add_subplot(gs[1, 0])
    for name, history in results.items():
        ax4.plot(history['sigma'], label=name, linewidth=2)
    ax4.set_xlabel('Generation')
    ax4.set_ylabel('Sigma (Mutation Range)')
    ax4.set_title('Adaptive Mutation Range')
    ax4.legend()
    ax4.grid(True, alpha=0.3)
    ax4.set_yscale('log')
    
    # Population Diversity
    ax5 = fig.add_subplot(gs[1, 1])
    for name, history in results.items():
        ax5.plot(history['population_diversity'], label=name, linewidth=2, alpha=0.7)
    ax5.set_xlabel('Generation')
    ax5.set_ylabel('Population Diversity (Std of Params)')
    ax5.set_title('Population Diversity Over Time')
    ax5.legend()
    ax5.grid(True, alpha=0.3)
    
    # Training Loss
    ax6 = fig.add_subplot(gs[1, 2])
    for name, history in results.items():
        ax6.plot(history['train_loss'], label=name, linewidth=2)
    ax6.set_xlabel('Generation')
    ax6.set_ylabel('Training Loss')
    ax6.set_title('Training Loss')
    ax6.legend()
    ax6.grid(True, alpha=0.3)
    ax6.set_yscale('log')
    
    # Learning Impact (Best Individual vs Final)
    ax7 = fig.add_subplot(gs[2, 0])
    for name, history in results.items():
        improvements = [history['test_acc'][i] - history['best_individual_acc'][i] 
                       for i in range(len(history['test_acc']))]
        ax7.plot(improvements, label=name, linewidth=2, alpha=0.7)
    ax7.set_xlabel('Generation')
    ax7.set_ylabel('Learning Improvement')
    ax7.set_title('Within-Life Learning Impact')
    ax7.axhline(y=0, color='black', linestyle='--', alpha=0.3)
    ax7.legend()
    ax7.grid(True, alpha=0.3)
    
    # Convergence Comparison
    ax8 = fig.add_subplot(gs[2, 1:])
    methods = list(results.keys())
    final_acc = [results[m]['test_acc'][-1] for m in methods]
    best_acc = [max(results[m]['test_acc']) for m in methods]
    final_time = [results[m]['time'][-1] for m in methods]
    total_samples = [results[m]['samples_seen'][-1] / 1e6 for m in methods]
    
    x = np.arange(len(methods))
    width = 0.2
    
    ax8.bar(x - 1.5*width, final_acc, width, label='Final Test Acc', alpha=0.8)
    ax8.bar(x - 0.5*width, best_acc, width, label='Best Test Acc', alpha=0.8)
    ax8.bar(x + 0.5*width, [t/max(final_time) for t in final_time], width,
           label='Relative Time', alpha=0.8)
    ax8.bar(x + 1.5*width, [s/max(total_samples) for s in total_samples], width,
           label='Relative Samples', alpha=0.8)
    
    ax8.set_ylabel('Value')
    ax8.set_title('Final Performance Comparison')
    ax8.set_xticks(x)
    ax8.set_xticklabels(methods, rotation=15, ha='right')
    ax8.legend()
    ax8.grid(True, alpha=0.3, axis='y')
    
    plt.savefig('bio_inspired_hybrid.png', dpi=150, bbox_inches='tight')
    plt.show()
    
    # 結果摘要
    print("\n" + "="*70)
    print("最終結果摘要")
    print("="*70)
    for name, history in results.items():
        print(f"\n{name}:")
        print(f"  最終測試準確率: {history['test_acc'][-1]:.4f}")
        print(f"  最佳測試準確率: {max(history['test_acc']):.4f}")
        print(f"  總訓練時間: {history['time'][-1]:.1f}秒")
        print(f"  總樣本數: {history['samples_seen'][-1]/1e6:.2f}M")
        print(f"  最終Sigma: {history['sigma'][-1]:.6f}")
        print(f"  樣本效率: {history['test_acc'][-1]/(history['samples_seen'][-1]/1e6):.4f} acc/M")