In [4]:
import gym
from gym import spaces
import numpy as np
import time

try:
    # Import the pipeline execution function (assumes pipeline.py defines run_pipeline)
    from pipeline import run_pipeline
except ImportError:
    # If pipeline is a module in the same package
    from .pipeline import run_pipeline

class PipelineEnv(gym.Env):
    """
    A custom Gym environment for optimizing a machine learning pipeline using RL.
    The action space is hierarchical (node, method, hyperparams), and the state 
    includes pipeline context, performance metrics, and budget info.
    """
    metadata = {"render.modes": ["human"]}
    
    def __init__(self, pipeline_nodes=None, total_budget=100.0, max_steps=50, 
                 cost_penalty=0.0, include_stats=False):
        """
        Initialize the PipelineEnv.
        
        Parameters:
            pipeline_nodes (list or None): Sequence of node identifiers to include in the pipeline.
                                           If None, use the default full pipeline node list.
            total_budget (float): Total allowable cost (e.g. time budget) for an episode.
            max_steps (int): Maximum number of steps (actions) per episode.
            cost_penalty (float): Lambda coefficient for cost in the reward function.
            include_stats (bool): If True, include historical statistics (e.g. method usage counts) in state.
        """
        super(PipelineEnv, self).__init__()
        
        # Define the pipeline node list and methods for each node.
        # If pipeline_nodes is not provided, use a default sequence of nodes.
        if pipeline_nodes is None:
            self.pipeline_nodes = ['N0','N1','N2','N3','N4','N5']  # default full pipeline
        else:
            self.pipeline_nodes = pipeline_nodes
        self.num_nodes = len(self.pipeline_nodes)
        
        # Define available methods for each node (this should align with pipeline.py capabilities).
        # For each node, list the valid methods. Methods are identified by name strings.
        self.methods_for_node = {
            'N0': ['api'],
            'N1': ['mean', 'knn', 'median'],
            'N2': ['default'],
            'N3': ['none', 'variance'],
            'N4': ['std', 'robust'],
            'N5': ['rf', 'gbr']
        }
        # Filter methods_for_node to only include nodes in pipeline_nodes:
        self.methods_for_node = {node: self.methods_for_node[node] for node in self.pipeline_nodes if node in self.methods_for_node}
        
        # Create a global index mapping for nodes and methods (for use in spaces and state).
        self.node_index = {node: idx for idx, node in enumerate(self.pipeline_nodes)}
        # We'll use a fixed maximum number of methods for action space. Determine the max length:
        self.max_methods = max(len(methods) for methods in self.methods_for_node.values()) if self.methods_for_node else 0
        # Define a global method index mapping per node for convenience (not strictly needed globally).
        # (For simplicity, method selection in the action will be interpreted relative to the chosen node's list.)
        
        # Define hyperparameter vector dimension (d): choose a unified length that covers max needed parameters.
        # Based on known pipeline, max hyperparameters for any method ~ 2 (e.g., Random Forest has n_estimators & max_depth).
        self.hyperparam_dim = 2  # we use 2 as a default; this can be adjusted if methods with more params are added.
        
        # Gym action space: Tuple( node_selection, method_selection, hyperparam_vector ).
        self.action_space = spaces.Tuple((
            spaces.Discrete(self.num_nodes),                 # Node index
            spaces.Discrete(self.max_methods),               # Method index (to be interpreted according to node)
            spaces.Box(low=0.0, high=1.0, shape=(self.hyperparam_dim,), dtype=np.float32)  # Hyperparameter vector
        ))
        
        # Gym observation space: use a Dict for structured state.
        obs_spaces = {
            "step": spaces.Box(low=0, high=max_steps, shape=(1,), dtype=np.int32),
            "remaining_budget": spaces.Box(low=0.0, high=total_budget, shape=(1,), dtype=np.float32),
            "last_node": spaces.Discrete(self.num_nodes),    # index of last selected node
            "last_method": spaces.Discrete(self.max_methods),# index of last selected method (relative to node's list)
            "last_hyperparams": spaces.Box(low=0.0, high=1.0, shape=(self.hyperparam_dim,), dtype=np.float32),
            "node_selected": spaces.MultiBinary(self.num_nodes),  # binary flags for whether each node has been selected
            "val_mae": spaces.Box(low=0.0, high=np.inf, shape=(1,), dtype=np.float32),
            "val_r2": spaces.Box(low=-np.inf, high=1.0, shape=(1,), dtype=np.float32),
            "last_runtime": spaces.Box(low=0.0, high=np.inf, shape=(1,), dtype=np.float32),
            "current_n_features": spaces.Box(low=0, high=np.inf, shape=(1,), dtype=np.int32)
        }
        # Include optional historical stats if requested (e.g., method usage counts).
        self.include_stats = include_stats
        if include_stats:
            # Prepare a vector for method usage counts (for all methods across all nodes).
            # We will order this vector as [count(node0_method0), count(node0_method1), ..., count(node1_method0), ...].
            total_method_options = 0
            self.method_index_offset = {}  # keep track of index offset for each node in the vector
            for node in self.pipeline_nodes:
                self.method_index_offset[node] = total_method_options
                total_method_options += len(self.methods_for_node.get(node, []))
            obs_spaces["method_counts"] = spaces.Box(low=0, high=max_steps, shape=(total_method_options,), dtype=np.int32)
        
        self.observation_space = spaces.Dict(obs_spaces)
        
        # Save configuration parameters
        self.total_budget = total_budget
        self.max_steps = max_steps
        self.cost_penalty = cost_penalty
        
        # Initialize dynamic variables
        self.step_count = 0
        self.remaining_budget = None
        self.pipeline_config = None    # will hold current pipeline configuration (dict of nodes to {method, params})
        self.last_action = None        # store last action (node, method, hyperparams) for reference
        self.best_val_mae = None
        self.no_improve_steps = None
        
        # Reset environment to initial state
        self.reset()
    
    def reset(self):
        """Reset the environment to an initial state at the start of an episode."""
        # Reset counters and budget
        self.step_count = 0
        self.remaining_budget = float(self.total_budget)
        self.no_improve_steps = 0
        self.best_val_mae = float('inf')
        # Initialize pipeline configuration to default methods (or a provided baseline).
        # Here we pick the first method of each node's list as a default (could also be a specific known good default).
        self.pipeline_config = {}
        for node in self.pipeline_nodes:
            methods = self.methods_for_node.get(node, [])
            if len(methods) > 0:
                default_method = methods[0]
            else:
                default_method = None
            # No hyperparameters initially (or a default hyperparam config if needed).
            self.pipeline_config[node] = {
                "method": default_method,
                "params": {}  # will fill when method actually requires it
            }
        # Run the pipeline once to get initial performance metrics (if needed to start state).
        initial_val_mae = 0.0
        initial_val_r2 = 0.0
        initial_runtime = 0.0
        if self.pipeline_config:
            # If pipeline can run with default config, do so to get initial metrics.
            try:
                result = run_pipeline(self.pipeline_config, return_intermediates=True)
            except Exception as e:
                # If pipeline execution fails (e.g. due to missing data), handle gracefully.
                print("Warning: run_pipeline failed during reset:", e)
                result = None
            if result is not None:
                # Assume result may be a tuple: (metrics, intermediates) or just metrics.
                if isinstance(result, tuple):
                    metrics, intermediates = result
                else:
                    metrics, intermediates = result, None
                # Extract metrics if available
                if isinstance(metrics, dict):
                    initial_val_mae = metrics.get("val_mae", 0.0)
                    initial_val_r2 = metrics.get("val_r2", 0.0)
                # If intermediates available, perhaps get feature count from final data
                # (This assumes intermediates[-1] contains final dataset or model input)
                if intermediates:
                    try:
                        # If the last intermediate has features, get its shape
                        final_data = intermediates[-1].get("data") if isinstance(intermediates[-1], dict) else intermediates[-1]
                        if hasattr(final_data, "shape"):
                            # final_data could be (X_val, y_val) tuple or similar; handle accordingly
                            if isinstance(final_data, tuple):
                                X_val = final_data[0]
                                current_n_features = X_val.shape[1] if hasattr(X_val, "shape") else 0
                            else:
                                current_n_features = final_data.shape[1] if len(final_data.shape) > 1 else final_data.shape[0]
                        else:
                            current_n_features = 0
                    except Exception:
                        current_n_features = 0
                else:
                    current_n_features = 0
            else:
                # If no result, keep initial metrics zero (or could set to None)
                current_n_features = 0
        else:
            # No pipeline nodes? then nothing to run
            current_n_features = 0
        
        # Initialize last action info as none (or default no-op)
        self.last_action = (-1, -1, np.zeros(self.hyperparam_dim, dtype=np.float32))
        # Initialize node_selected flags
        node_flags = np.zeros(self.num_nodes, dtype=int)
        # Initialize method_counts if applicable
        if self.include_stats:
            total_method_options = list(self.method_index_offset.values())[-1] + len(self.methods_for_node.get(self.pipeline_nodes[-1], [])) if self.method_index_offset else 0
            method_counts = np.zeros(total_method_options, dtype=int)
            # Mark initial default methods as used once (if we consider initial config as a "use")
            # Here, we won't count initial defaults as agent's choices, so keep at zero.
        else:
            method_counts = None
        
        # Construct the initial observation dictionary
        obs = {
            "step": np.array([self.step_count], dtype=np.int32),
            "remaining_budget": np.array([self.remaining_budget], dtype=np.float32),
            "last_node": np.array([self.node_index.get(self.pipeline_nodes[0], 0)], dtype=np.int32),  # e.g., start at first node by default
            "last_method": np.array([0], dtype=np.int32),  # default index 0 method for that node
            "last_hyperparams": np.zeros((self.hyperparam_dim,), dtype=np.float32),
            "node_selected": node_flags.astype(np.int8),
            "val_mae": np.array([initial_val_mae], dtype=np.float32),
            "val_r2": np.array([initial_val_r2], dtype=np.float32),
            "last_runtime": np.array([initial_runtime], dtype=np.float32),
            "current_n_features": np.array([current_n_features], dtype=np.int32)
        }
        if self.include_stats:
            obs["method_counts"] = method_counts.astype(np.int32)
        
        return obs
    
    def step(self, action):
        """
        Execute one step in the environment with the given action.
        Action is a tuple: (node_index, method_index, hyperparam_vector).
        """
        # Unpack action tuple
        node_idx, method_idx, hyper_vector = action
        # Ensure hyper_vector is a numpy array for ease of handling
        hyper_vector = np.array(hyper_vector, dtype=np.float32)
        
        # Initialize reward
        reward = 0.0
        terminated = False
        truncated = False
        info = {}
        
        # Map the node index to actual node identifier
        if node_idx < 0 or node_idx >= self.num_nodes:
            # Invalid node selection
            reward = -100.0  # heavy penalty for illegal action
            terminated = False
            truncated = True
            info["error"] = "Invalid node selected"
            # Return current state (unchanged) with penalty
            return self._get_obs(), reward, terminated, truncated, info
        node = self.pipeline_nodes[node_idx]
        
        # Map the method index to actual method name for the chosen node
        methods = self.methods_for_node.get(node, [])
        if method_idx < 0 or method_idx >= len(methods):
            # Invalid method for this node
            reward = -100.0
            terminated = False
            truncated = True
            info["error"] = f"Invalid method selected for node {node}"
            return self._get_obs(), reward, terminated, truncated, info
        method = methods[method_idx]
        
        # Map hyperparameter vector [0,1]^d to actual hyperparameter values for this method
        params = self._map_hyperparams(node, method, hyper_vector)
        
        # Update the pipeline configuration with the chosen method and params for the node
        self.pipeline_config[node] = {"method": method, "params": params}
        
        # Mark this node as selected (for observation)
        # (We consider that the node is now configured/visited; this can be used to avoid re-selecting if not desired)
        # But we do not forbid revisiting in logic; we only mark for state info.
        
        # Run the pipeline with the new configuration to get performance metrics and cost
        start_time = time.time()
        try:
            result = run_pipeline(self.pipeline_config, return_intermediates=True)
        except Exception as e:
            # If pipeline execution fails, terminate episode
            reward = -100.0
            terminated = False
            truncated = True
            info["error"] = f"Pipeline execution failed: {e}"
            return self._get_obs(), reward, terminated, truncated, info
        end_time = time.time()
        
        # Calculate runtime cost for this step
        step_runtime = end_time - start_time
        cost = step_runtime
        # If the pipeline result provides more precise cost info, use it (e.g., per node times)
        metrics = {}
        intermediates = None
        if result is not None:
            if isinstance(result, tuple):
                metrics, intermediates = result
            else:
                metrics = result
            # If the pipeline returns an info dict with cost for this node, use that instead of total time
            if isinstance(metrics, dict) and "cost" in metrics:
                # 'cost' could be total or per-node; here assume total cost
                cost = metrics["cost"]
            # Alternatively, if intermediates provide time per node:
            if intermediates and isinstance(intermediates, list):
                # If each intermediate has a time or cost, we could extract the cost for this node.
                # This is placeholder logic; actual implementation depends on pipeline details.
                for interm in intermediates:
                    if isinstance(interm, dict) and "node" in interm and interm.get("node") == node and "time" in interm:
                        cost = interm["time"]
                        break
        
        # Update remaining budget
        self.remaining_budget -= cost
        
        # Extract performance metrics from the result (val_mae, val_r2)
        val_mae = 0.0
        val_r2 = 0.0
        if isinstance(metrics, dict):
            val_mae = metrics.get("val_mae", 0.0)
            val_r2 = metrics.get("val_r2", 0.0)
        
        # Compute reward: negative val_mae minus lambda * cost
        reward = -val_mae - self.cost_penalty * cost
        
        # Update improvement tracking for convergence
        if val_mae + 1e-9 < self.best_val_mae - 0.01:
            # Significant improvement (more than 0.01)
            self.best_val_mae = val_mae
            self.no_improve_steps = 0
        else:
            self.no_improve_steps += 1
        
        # Determine termination conditions
        if self.no_improve_steps >= 10:
            # No significant improvement for 10 steps -> convergence reached
            terminated = True
            info["reason"] = "converged_no_improvement"
        if self.remaining_budget < 0.0:
            # Budget exceeded
            reward = -100.0  # heavy penalty for overshooting budget
            truncated = True
            terminated = False
            info["error"] = "budget_exceeded"
        elif self.remaining_budget == 0.0:
            # Budget exactly exhausted -> end episode (not an error, considered normal termination)
            terminated = True
            info["reason"] = "budget_exhausted"
        if self.step_count + 1 >= self.max_steps:
            # Reached max steps limit
            truncated = True
            info["reason"] = "max_steps_reached"
        
        # Update step count
        self.step_count += 1
        
        # Prepare next state observation
        obs = self._get_obs(last_node=node_idx, last_method=method_idx, last_hyperparams=hyper_vector,
                            val_mae=val_mae, val_r2=val_r2, runtime=cost, intermediates=intermediates)
        
        return obs, reward, terminated, truncated, info
    
    def _get_obs(self, last_node=None, last_method=None, last_hyperparams=None, 
                 val_mae=0.0, val_r2=0.0, runtime=0.0, intermediates=None):
        """
        Construct the observation dictionary based on current environment state.
        This internal helper is used in reset() and step() to avoid code duplication.
        """
        if last_node is None or last_method is None or last_hyperparams is None:
            # If not provided, use the stored last action (after reset or in case of invalid action)
            last_node = -1 if self.last_action is None else self.last_action[0]
            last_method = -1 if self.last_action is None else self.last_action[1]
            last_hyperparams = np.zeros(self.hyperparam_dim, dtype=np.float32) if self.last_action is None else self.last_action[2]
        # Update last action memory
        self.last_action = (last_node, last_method, np.array(last_hyperparams, dtype=np.float32))
        
        # Determine current number of features from intermediates if available
        current_n_features = 0
        if intermediates:
            try:
                # If the intermediates include the dataset after the last modified node
                # Find data shape from intermediates (assuming list of dicts with maybe 'data' or similar)
                final_data = None
                if isinstance(intermediates, list):
                    # If we have a list of intermediate outputs for each node in sequence
                    final_data = intermediates[-1]
                elif isinstance(intermediates, dict):
                    # If intermediates is a dict of node outputs
                    final_data = intermediates.get(last_node)
                # Try to get number of features from final_data
                if final_data is not None:
                    if isinstance(final_data, tuple):
                        # If final_data is (X_val, y_val) tuple
                        X_val = final_data[0]
                        if hasattr(X_val, "shape"):
                            current_n_features = X_val.shape[1] if len(X_val.shape) > 1 else X_val.shape[0]
                    elif hasattr(final_data, "shape"):
                        current_n_features = final_data.shape[1] if len(final_data.shape) > 1 else final_data.shape[0]
                    elif isinstance(final_data, dict) and "data" in final_data:
                        data = final_data["data"]
                        if hasattr(data, "shape"):
                            current_n_features = data.shape[1] if len(data.shape) > 1 else data.shape[0]
            except Exception:
                current_n_features = 0
        
        # Update node selected flags
        node_flags = np.zeros(self.num_nodes, dtype=int)
        # Mark nodes that have been selected at least once so far. We can track this by comparing pipeline_config to initial defaults.
        for i, node in enumerate(self.pipeline_nodes):
            # We consider a node 'selected' if its method in pipeline_config is not the initial default or if step_count > 0 and node == last_node.
            if self.step_count > 0 and i == last_node:
                node_flags[i] = 1
            # (Optionally, keep track of any node changed from default too.)
        
        # Update method counts if tracking usage
        method_counts = None
        if self.include_stats:
            total_method_options = list(self.method_index_offset.values())[-1] + len(self.methods_for_node.get(self.pipeline_nodes[-1], [])) if self.method_index_offset else 0
            method_counts = np.zeros(total_method_options, dtype=int)
            # Count how many times each method has been selected.
            # We could keep a log of actions taken so far. Simpler: maintain a count dict and update each step.
            # Here, for demonstration, we will just update for the last action (assuming this is called each step).
            if last_node >= 0 and last_method >= 0:
                node = self.pipeline_nodes[last_node]
                offset = self.method_index_offset.get(node, 0)
                method_counts[offset + last_method] += 1
            # (Note: A more complete implementation would accumulate counts over steps.)
        
        # Construct observation dict
        obs = {
            "step": np.array([self.step_count], dtype=np.int32),
            "remaining_budget": np.array([self.remaining_budget], dtype=np.float32),
            "last_node": np.array([last_node if last_node >= 0 else 0], dtype=np.int32),
            "last_method": np.array([last_method if last_method >= 0 else 0], dtype=np.int32),
            "last_hyperparams": np.array(last_hyperparams, dtype=np.float32),
            "node_selected": node_flags.astype(np.int8),
            "val_mae": np.array([val_mae], dtype=np.float32),
            "val_r2": np.array([val_r2], dtype=np.float32),
            "last_runtime": np.array([runtime], dtype=np.float32),
            "current_n_features": np.array([current_n_features], dtype=np.int32)
        }
        if self.include_stats:
            obs["method_counts"] = method_counts.astype(np.int32)
        return obs
    
    def _map_hyperparams(self, node, method, hyper_vector):
        """
        Map the normalized hyperparameter vector to actual hyperparameter values for the given node and method.
        This uses predefined ranges for each method.
        """
        # For simplicity, we define some example mappings based on known method hyperparameter ranges.
        # These ranges should align with pipeline.py's expectations.
        params = {}
        # Node N1: Imputation methods
        if node == 'N1':
            if method == 'kNN':
                # kNN: n_neighbors in [1, 15]
                n = hyper_vector[0]
                params['n_neighbors'] = int(1 + n * 14)  # 1 to 15
            elif method == 'missforest':
                # MissForest: max_iter in [5, 20]
                n = hyper_vector[0]
                params['max_iter'] = int(5 + n * 15)  # 5 to 20
            # 'mean' has no hyperparameters.
        # Node N2: Feature generation
        if node == 'N2':
            if method == 'cgcnn':
                # CGCNN: embed_dim in [128, 512]
                x = hyper_vector[0]
                params['embed_dim'] = int(128 + x * (512 - 128))
            # 'magpie' and 'density_symmetry' might not have tunable params in this context.
        # Node N3: Feature selection
        if node == 'N3':
            if method == 'var_thresh':
                # Variance Threshold: var_ratio in [0.0, 0.2]
                x = hyper_vector[0]
                params['var_ratio'] = 0.0 + x * 0.2
            elif method == 'pfi':
                # Permutation Feature Importance: top_k in [5, 50]
                x = hyper_vector[0]
                params['top_k'] = int(5 + x * 45)
            # 'none' has no hyperparameters.
        # Node N5: Learner
        if node == 'N5':
            if method == 'rf':
                # Random Forest: n_estimators [50,800], max_depth [3,30] (31 interpreted as None)
                x1, x2 = hyper_vector[0], hyper_vector[1]
                params['n_estimators'] = int(50 + x1 * 750)
                depth = int(3 + x2 * 27)
                params['max_depth'] = None if depth >= 30 else depth  # use None if at upper bound
            elif method == 'gbr':
                # Gradient Boosting: learning_rate [0.01, 0.3]
                x = hyper_vector[0]
                params['learning_rate'] = 0.01 + x * (0.3 - 0.01)
#         # Node N6: Hyperparameter search
#         if node == 'N6':
#             if method == 'random_search':
#                 # RandomizedSearchCV: n_iter [20, 150]
#                 x = hyper_vector[0]
#                 params['n_iter'] = int(20 + x * 130)
#             elif method == 'bayes_opt':
#                 # Bayesian Optimization (TPE): trials [25, 200]
#                 x = hyper_vector[0]
#                 params['trials'] = int(25 + x * 175)
        # Node N0 and others: possibly no hyperparameters needed (data fetch, scaling, etc.)
        # 'API', 'Cache', 'standard', 'robust', 'mean' do not have tunable hyperparameters in this context.
        return params
    
    def render(self, mode='human'):
        """Render the current state of the environment (print the pipeline configuration and performance)."""
        if mode != 'human':
            return
        print(f"Step {self.step_count}:")
        print(f"  Pipeline configuration: ")
        for node in self.pipeline_nodes:
            cfg = self.pipeline_config.get(node, {})
            if not cfg:
                continue
            method = cfg.get("method")
            params = cfg.get("params", {})
            print(f"    {node}: {method} {params}")
        print(f"  Validation MAE: {self.last_action_metrics.get('val_mae', None) if hasattr(self, 'last_action_metrics') else 'N/A'}")
        print(f"  Validation R2: {self.last_action_metrics.get('val_r2', None) if hasattr(self, 'last_action_metrics') else 'N/A'}")
        print(f"  Remaining Budget: {self.remaining_budget}")
    
    def get_action_space(self):
        """Return the action space of the environment."""
        return self.action_space
    
    def get_observation_space(self):
        """Return the observation space of the environment."""
        return self.observation_space
    
    def get_pipeline_config(self):
        """Get the current pipeline configuration."""
        return self.pipeline_config

# End of env.py


### Testing

TypeError: __init__() missing 1 required positional argument: 'df_train'