In [15]:
import numpy as np
import time
import itertools
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import load_digits 
from sklearn.metrics import accuracy_score
from scipy.special import expit as sigmoid, logit
from typing import List, Tuple, Optional, Dict

# ==============================================================================
# PART 0: CORE FRAMEWORK FUNCTIONS (Self-Contained)
# NOTE: This section includes the necessary functions for the HC logic 
#       (SigmoidWarping, CompressionManager, detect_basin, HC functions).
#       These are taken from the user's previous context/code.
# ==============================================================================

# --- Sigmoid Warping and Compression Metadata ---

class SigmoidWarping:
    def __init__(self, node_index, length, steepness=5.0):
        self.node_start = int(node_index)
        self.length = int(length)
        self.node_end = self.node_start + self.length
        self.steepness = float(steepness)
        self.shift = self.length - 1 

    def forward(self, node):
        node = np.atleast_1d(node).astype(float)
        position = np.zeros(len(node), dtype=float)
        for i, n in enumerate(node):
            if n < self.node_start: position[i] = float(n)
            elif n > self.node_end: position[i] = float(n) - self.shift
            else:
                if n == self.node_start: position[i] = float(self.node_start)
                elif n == self.node_end: position[i] = float(self.node_start + 1)
                else:
                    t = (n - self.node_start) / self.length
                    x = self.steepness * (t - 0.5)
                    position[i] = self.node_start + sigmoid(x)
        return position[0] if len(node) == 1 else position

    def inverse(self, position):
        position = np.atleast_1d(position).astype(float)
        node = np.zeros(len(position), dtype=int)
        for i, pos in enumerate(position):
            if pos < self.node_start: node[i] = int(np.floor(pos))
            elif pos >= self.node_start + 1.0: node[i] = int(np.ceil(pos + self.shift))
            else:
                s = np.clip(pos - self.node_start, 0.0, 1.0)
                if s <= 0.01: node[i] = self.node_start
                elif s >= 0.99: node[i] = self.node_end
                else:
                    x = logit(s)
                    t = x / self.steepness + 0.5
                    node_f = self.node_start + t * self.length
                    node[i] = int(round(node_f))
        return node[0] if len(node) == 1 else node

class MetadataCompressionOriginalSpace:
    def __init__(self, compressions_x_space=None, steepness=5.0):
        self.metadata_x = sorted(compressions_x_space or [], key=lambda x: x[0])
        self.steepness = float(steepness)
        self.warpings = []
        if self.metadata_x: self._build_warpings()

    def _build_warpings(self):
        cumulative_shift = 0
        for i, (x_start, x_length) in enumerate(self.metadata_x):
            z_start = x_start - cumulative_shift
            z_length = x_length
            warping = SigmoidWarping(z_start, z_length, self.steepness)
            self.warpings.append(warping)
            cumulative_shift += (z_length - 1)

    def forward(self, node):
        position = node
        for warping in self.warpings: position = warping.forward(position)
        return position

    def inverse(self, position):
        node = position
        for warping in reversed(self.warpings): node = warping.inverse(node)
        return node

def detect_compression_basin(fitness_func, local_min_x, max_search=100):
    local_min_fitness = fitness_func(local_min_x)
    
    # LEFT search logic...
    left_boundary = local_min_x
    farthest_left_equal = local_min_x
    found_left_equal = False
    found_left_exit = False
    for i in range(1, max_search + 1):
        current_x = local_min_x - i
        current_fitness = fitness_func(current_x)
        if abs(current_fitness - local_min_fitness) < 1e-9: farthest_left_equal = current_x; found_left_equal = True
        elif current_fitness > local_min_fitness: pass
        else: left_boundary = current_x + 1; found_left_exit = True; break
    else: left_boundary = local_min_x - max_search
    if not found_left_exit and found_left_equal: left_boundary = farthest_left_equal

    # RIGHT search logic...
    right_boundary = local_min_x
    farthest_right_equal = local_min_x
    found_right_equal = False
    found_right_exit = False
    for i in range(1, max_search + 1):
        current_x = local_min_x + i
        current_fitness = fitness_func(current_x)
        if abs(current_fitness - local_min_fitness) < 1e-9: farthest_right_equal = current_x; found_right_equal = True
        elif current_fitness > local_min_fitness: pass
        else: right_boundary = current_x - 1; found_right_exit = True; break
    else: right_boundary = local_min_x + max_search
    if not found_right_exit and found_right_equal: right_boundary = farthest_right_equal

    basin_length = right_boundary - left_boundary + 1
    if basin_length < 2: return None
    if not (found_left_equal or found_right_equal or found_left_exit or found_right_exit): return None
    return (left_boundary, basin_length)

def merge_overlapping_compressions(compressions):
    if not compressions: return []
    sorted_comps = sorted(compressions, key=lambda x: x[0])
    merged = []
    for start, length in sorted_comps:
        end = start + length - 1
        if not merged: merged.append((start, length)); continue
        last_start, last_length = merged[-1]
        last_end = last_start + last_length - 1
        if start <= last_end + 1:
            new_start = min(start, last_start)
            new_end = max(end, last_end)
            new_length = new_end - new_start + 1
            merged[-1] = (new_start, new_length)
        else: merged.append((start, length))
    return merged

class CompressionManagerND:
    def __init__(self, dim, steepness=5.0):
        self.dim = dim
        self.steepness = float(steepness)
        self.dim_compressions = [{} for _ in range(dim)]
        self.dim_systems = [{} for _ in range(dim)]
    
    def update_dimension(self, vary_dim, fixed_coords, basin):
        if basin is None: return
        comps = self.dim_compressions[vary_dim].get(fixed_coords, [])
        comps.append(basin)
        comps = merge_overlapping_compressions(comps)
        self.dim_compressions[vary_dim][fixed_coords] = comps
        self.dim_systems[vary_dim][fixed_coords] = MetadataCompressionOriginalSpace(comps, self.steepness)
    
    def get_system(self, vary_dim, fixed_coords):
        return self.dim_systems[vary_dim].get(fixed_coords, None)

def detect_basin_along_dimension(fitness_func_nd, point, vary_dim, max_search=100):
    def f1d(val):
        new_point = list(point)
        new_point[vary_dim] = int(val)
        return fitness_func_nd(tuple(new_point))
    
    basin = detect_compression_basin(f1d, local_min_x=int(point[vary_dim]), max_search=max_search)
    
    min_val = int(point[vary_dim])
    min_f = fitness_func_nd(point)
    
    if basin:
        start, length = basin
        print(f"    [Dim {vary_dim}] (1D Detect @ x={min_val}, f={min_f:.4f}) -> Basin: [{start}, {start+length-1}] (len={length})")
    else:
        print(f"    [Dim {vary_dim}] (1D Detect @ x={min_val}, f={min_f:.4f}) -> No compressible basin found.")
        
    return basin

def hill_climb_with_compression_nd(fitness_func_nd, start_point, dim, max_iterations=10, basin_max_search=100, global_min_threshold=1e-6):
    traj = []
    cm = CompressionManagerND(dim, steepness=5.0)
    point = tuple(int(x) for x in start_point)
    f = fitness_func_nd(point)
    traj.append((point, f, False))
    
    print(f"\nüöÄ {dim}D COMPRESSION climb start at {point}, f={f:.4f}\n")

    for it in range(max_iterations):
        print(f"================================================================================")
        print(f"üîÑ Iteration {it+1}/{max_iterations}")
        print(f"================================================================================")
        
        step_count = 0
        while True:
            candidates = []
            # 1. O(D) Axis-aligned Neighbors
            for d in range(dim):
                fixed_coords = tuple(point[i] for i in range(dim) if i != d)
                comp_sys = cm.get_system(d, fixed_coords)
                if comp_sys is not None:
                    z = comp_sys.forward(point[d])
                    nm, np_ = comp_sys.inverse(z - 1), comp_sys.inverse(z + 1)
                else:
                    nm, np_ = point[d] - 1, point[d] + 1
                
                pm = list(point); pm[d] = nm
                pp = list(point); pp[d] = np_
                candidates.append((tuple(pm), fitness_func_nd(tuple(pm))))
                candidates.append((tuple(pp), fitness_func_nd(tuple(pp))))
            
            # 2. O(D^2) Diagonal Neighbors
            for d1, d2 in itertools.combinations(range(dim), 2):
                for o1 in [1, -1]:
                    for o2 in [1, -1]:
                        np_ = list(point); np_[d1] += o1; np_[d2] += o2
                        candidates.append((tuple(np_), fitness_func_nd(tuple(np_))))
            
            best_point, best_f = point, f
            for cp, cf in candidates:
                if cf < best_f - 1e-9: best_point, best_f = cp, cf
            
            if best_f < f - 1e-9:
                point, f = best_point, best_f
                used_comp = any(cm.get_system(d, tuple(point[i] for i in range(dim) if i != d)) is not None for d in range(dim))
                traj.append((point, f, used_comp))
                step_count += 1
            else:
                print(f"  üìç Climbed {step_count} steps, now at {point}, f={f:.6g}")
                break
        
        if abs(f) < global_min_threshold:
            print("\nüéâ SUCCESS: reached near-global minimum")
            break
        
        # Basin Detection
        print(f"\n‚ö†Ô∏è STUCK at local minimum {point}, f={f:.6g}")
        print(f"  üîç Detecting basins along all {dim} dimensions...")
        basins = {}
        for d in range(dim):
            basin = detect_basin_along_dimension(fitness_func_nd, point, d, max_search=basin_max_search)
            if basin:
                fixed_coords = tuple(point[i] for i in range(dim) if i != d)
                cm.update_dimension(d, fixed_coords, basin)
                basins[d] = basin
        
        # Restart Logic
        restart_candidates = []
        for d, (b_start, b_len) in basins.items():
            b_end = b_start + b_len - 1
            rp1 = list(point); rp1[d] = b_start - 1
            rp2 = list(point); rp2[d] = b_end + 1
            restart_candidates.append((tuple(rp1), fitness_func_nd(tuple(rp1))))
            restart_candidates.append((tuple(rp2), fitness_func_nd(tuple(rp2))))
        
        if restart_candidates:
            best_r_p, best_r_f = min(restart_candidates, key=lambda t: t[1])
            if best_r_f < f - 1e-9:
                print(f"  ‚û°Ô∏è Restarting from {best_r_p}, f={best_r_f:.4f}")
                point, f = best_r_p, best_r_f
                traj.append((point, f, True))
            else:
                print("\n  ‚ö†Ô∏è Restart candidates didn't improve fitness. Stopping.")
                break
        else:
            break
            
    print(f"\nüèÅ FINAL {dim}D COMPRESSION RESULTS üèÅ")
    print(f"  Final position: {point}, Final fitness: {f:.6g}, Total steps: {len(traj)}")
    return traj

def hill_climb_simple_nd(fitness_func_nd, start_point, dim, max_steps=2000):
    point = tuple(int(x) for x in start_point)
    f = fitness_func_nd(point)
    traj = [(point, f)]
    print(f"\nüöÄ {dim}D BASELINE climb start at {point}, f={f:.4f}\n")

    for _ in range(max_steps):
        candidates = []
        # 1. O(D)
        for d in range(dim):
            nm = list(point); nm[d] -= 1
            np_ = list(point); np_[d] += 1
            candidates.append((tuple(nm), fitness_func_nd(tuple(nm))))
            candidates.append((tuple(np_), fitness_func_nd(tuple(np_))))
        # 2. O(D^2) Diagonal
        for d1, d2 in itertools.combinations(range(dim), 2):
            for o1 in [1, -1]:
                for o2 in [1, -1]:
                    np_ = list(point); np_[d1] += o1; np_[d2] += o2
                    candidates.append((tuple(np_), fitness_func_nd(tuple(np_))))

        best_point, best_f = point, f
        for cp, cf in candidates:
            if cf < best_f - 1e-9: best_point, best_f = cp, cf
        
        if best_f < f - 1e-9:
            point, f = best_point, best_f
            traj.append((point, f))
        else:
            break
            
    print(f"üèÅ FINAL {dim}D BASELINE RESULTS üèÅ")
    print(f"  Final position: {point}, Final fitness: {f:.6g}, Total steps: {len(traj)}")
    return traj

# ==============================================================================
# 3. RANDOM FOREST HPO IMPLEMENTATION
# ==============================================================================

# üíæ Îç∞Ïù¥ÌÑ∞ Î°úÎìú Î∞è Ï†ÑÏ≤òÎ¶¨ (load_digits ÏÇ¨Ïö©)
# NOTE: load_digitsÎäî HPOÏùò Îπ†Î•∏ Î∞òÎ≥µ ÌÖåÏä§Ìä∏Ïóê Ï†ÅÌï©Ìï©ÎãàÎã§.
try:
    X, y = load_digits(return_X_y=True)
    X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.3, random_state=42)
    print(f"\n‚úÖ Data Loaded (Digits): Train={len(X_train)} samples, Validation={len(X_val)} samples.")
except Exception as e:
    print(f"\n‚ùå Failed to load data: {e}")
    sys.exit(1)


# ‚≠ê HPO Î™©Ìëú Ìï®Ïàò Ï†ïÏùò (Minimization)
def rf_fitness_nd(point: Tuple[int, int]) -> float:
    """
    Random Forest HPO Objective Function (Minimize 1 - Accuracy).
    
    point[0] (x0) = n_estimators (Range: 10-200)
    point[1] (x1) = max_depth (Range: 2-30)
    """
    
    # 1. Parameter Mapping & Constraint Enforcement
    # The HC framework handles the discrete steps, here we enforce the bounds.
    n_estimators = max(10, min(200, int(point[0])))
    max_depth = max(2, min(30, int(point[1])))
    
    # 2. Model Training & Evaluation
    try:
        model = RandomForestClassifier(
            n_estimators=n_estimators, 
            max_depth=max_depth,
            random_state=42, 
            n_jobs=-1  # Use all cores for fast training
        )
        model.fit(X_train, y_train)
        
        y_pred = model.predict(X_val)
        accuracy = accuracy_score(y_val, y_pred)
        
        # Fitness = 1 - Accuracy (Minimization Objective)
        fitness = 1.0 - accuracy
        
    except Exception as e:
        # Penalize invalid/failed attempts severely
        fitness = 1.0 

    return fitness


# ==============================================================================
# 4. MAIN EXECUTION (Comparison and Analysis)
# ==============================================================================

# HPO Search Space Boundaries & Start Point
DIM = 2
START_POINT_HPO = (100, 3)  # Start at n_estimators=20, max_depth=10
GLOBAL_MIN_F = 0.0          # Target: 100% Accuracy
THRESHOLD = 0.01            # Acceptable fitness target (e.g., 95% Acc)

print("\n" + "="*80)
print("üéØ Random Forest HPO Comparison (D=2)")
print(f"   Parameters: (n_estimators, max_depth)")
print(f"   Search Space: [10-200] x [2-30]")
print(f"   Start Point: {START_POINT_HPO}")
print("="*80)

# --- 1. Run with Compression HC ---
time_start_comp = time.time()
traj_comp = hill_climb_with_compression_nd(
    fitness_func_nd=rf_fitness_nd,
    start_point=START_POINT_HPO,
    dim=DIM,
    max_iterations=5,          # Ï†úÌïúÎêú ÏïïÏ∂ï/Ïû¨ÏãúÏûë ÌöüÏàò
    basin_max_search=20,
    global_min_threshold=THRESHOLD
)
time_end_comp = time.time()

# --- 2. Run Baseline HC ---
time_start_base = time.time()
traj_baseline = hill_climb_simple_nd(
    fitness_func_nd=rf_fitness_nd,
    start_point=START_POINT_HPO,
    dim=DIM,
    max_steps=500  # Ï∂©Î∂ÑÌïú ÌÉêÏÉâ Ïä§ÌÖù Î∂ÄÏó¨
)
time_end_base = time.time()

# ==============================================================================
# 5. FINAL SUMMARY
# ==============================================================================

def print_final_summary(traj, time_taken, label):
    if not traj:
        print(f"  {label:<30}: FAILED (No steps)")
        return 0.0
    
    final_f = traj[-1][1]
    final_p = traj[-1][0]
    accuracy = 1.0 - final_f
    steps = len(traj)
    
    print(f"  {label:<30}:")
    print(f"    - Final Params (N, D): {str(final_p):<25}")
    print(f"    - Final Fitness (Min): {final_f:<25.6f}")
    print(f"    - Final Accuracy:      {accuracy:<25.6f}")
    print(f"    - Total Steps:         {steps:<25}")
    print(f"    - Time Taken (s):      {time_taken:<25.4f}")
    
    return accuracy

print("\n" + "=="*40)
print("        FINAL RANDOM FOREST HPO COMPARISON")
print("=="*40)

acc_comp = print_final_summary(traj_comp, time_end_comp - time_start_comp, "Compression Hill Climbing")
acc_base = print_final_summary(traj_baseline, time_end_base - time_start_base, "Baseline Hill Climbing")

print("\n--- HPO Efficacy ---")
if acc_comp > acc_base + 1e-4:
    print(f"üéâ **Compression HC Succeeded!** (Accuracy improvement: {acc_comp - acc_base:.4f})")
else:
    print("‚ö†Ô∏è Baseline and Compression HC performed similarly.")


‚úÖ Data Loaded (Digits): Train=1257 samples, Validation=540 samples.

üéØ Random Forest HPO Comparison (D=2)
   Parameters: (n_estimators, max_depth)
   Search Space: [10-200] x [2-30]
   Start Point: (100, 3)

üöÄ 2D COMPRESSION climb start at (100, 3), f=0.1130

üîÑ Iteration 1/5
  üìç Climbed 4 steps, now at (100, 7), f=0.0333333

‚ö†Ô∏è STUCK at local minimum (100, 7), f=0.0333333
  üîç Detecting basins along all 2 dimensions...
    [Dim 0] (1D Detect @ x=100, f=0.0333) -> Basin: [95, 101] (len=7)
    [Dim 1] (1D Detect @ x=7, f=0.0333) -> Basin: [-13, 8] (len=22)
  ‚û°Ô∏è Restarting from (100, 9), f=0.0259
üîÑ Iteration 2/5
  üìç Climbed 1 steps, now at (100, np.int64(10)), f=0.0222222

‚ö†Ô∏è STUCK at local minimum (100, np.int64(10)), f=0.0222222
  üîç Detecting basins along all 2 dimensions...
    [Dim 0] (1D Detect @ x=100, f=0.0222) -> Basin: [94, 120] (len=27)
    [Dim 1] (1D Detect @ x=10, f=0.0222) -> Basin: [-10, 12] (len=23)

  ‚ö†Ô∏è Restart candidates didn't 