# Main

Start here.

# Libraries

Libraries used in the framework.

In [1]:
import os
import ast
import inspect
from collections.abc import Iterable
from sklearn.base import TransformerMixin, BaseEstimator
from sklearn.ensemble import BaseEnsemble

# General libraries
import json
import numpy as np
import pandas as pd

# Pipelines
from imblearn.pipeline import Pipeline 
from sklearn.pipeline import make_pipeline
from sklearn.compose import ColumnTransformer
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.model_selection import GridSearchCV, cross_validate, StratifiedKFold

# Model Selection 
from sklearn.model_selection import train_test_split

# Model assessment
from sklearn.metrics import classification_report, confusion_matrix

# Helper Classes

These classes are intended to assist in custom transformations of the data that are consistent with the sklearn methodology, by wrapping known functions, such as np.clip, in a fit - transform schema.

## Wrapper for np.clip()

Allows the implementation of np.clip as a custom transformer.

In [2]:
from sklearn.base import BaseEstimator, TransformerMixin
import numpy as np
import pandas as pd

class OutlierClipper(BaseEstimator, TransformerMixin):
    def __init__(self, lower_percentile=0.005, upper_percentile=0.995, use_iqr=False):
        """
        Initialize the OutlierClipper with options for percentile clipping or IQR-based clipping.

        Parameters:
        - lower_percentile: float, lower bound percentile for clipping (if percentiles are used)
        - upper_percentile: float, upper bound percentile for clipping (if percentiles are used)
        - use_iqr: bool, whether to use IQR method for determining bounds
        """
        self.lower_percentile = lower_percentile
        self.upper_percentile = upper_percentile
        self.use_iqr = use_iqr
    
    def fit(self, X, y=None):
        """
        Fit the clipping bounds based on the training dataset using the specified method (percentiles or IQR).

        Parameters:
        - X: numpy.ndarray or pandas.DataFrame, the dataset used for fitting
        - y: ignored, not used for fitting

        Returns:
        - self: fitted instance of the class
        """
        # Convert to DataFrame if input is numpy array
        X = pd.DataFrame(X) if not isinstance(X, pd.DataFrame) else X
        
        # For each column in X, calculate the bounds using the specified method
        self.bounds_ = {}
        for column in X.columns:
            if self.use_iqr:
                q1 = X[column].quantile(0.25)  # 1st quartile
                q3 = X[column].quantile(0.75)  # 3rd quartile
                iqr = q3 - q1  # Interquartile range
                lower_bound = q1 - 1.5 * iqr
                upper_bound = q3 + 1.5 * iqr
            else:
                lower_bound = X[column].quantile(self.lower_percentile)
                upper_bound = X[column].quantile(self.upper_percentile)

            self.bounds_[column] = (lower_bound, upper_bound)

        return self

    def transform(self, X):
        """
        Apply clipping to the dataset based on the fitted bounds.

        Parameters:
        - X: numpy.ndarray or pandas.DataFrame, the dataset to transform

        Returns:
        - X: pandas.DataFrame, the transformed dataset with clipped values
        """
        # Convert to DataFrame if input is numpy array
        X = pd.DataFrame(X) if not isinstance(X, pd.DataFrame) else X

        # Apply clipping for each column
        for column, (lower_bound, upper_bound) in self.bounds_.items():
            X[column] = X[column].clip(lower=lower_bound, upper=upper_bound)

        return X

    def set_output(self, transform="default"):
        """
        Enable compatibility with scikit-learn's `set_output` functionality.

        Parameters:
        - transform: str, the output format ("default" or "pandas").
        """
        self.output_format = transform
        return self


# MLAnalytics

Responsible for validating instructions, and writting results files. 

In [3]:
class MLAnalytics:
    """
    A base class to handle shared functionality for analytics-related tasks
    like logging and signing pipeline configurations.
    """

    def __init__(self, base_dir):
        """
        Initializes common functionality for both signing and logging modes.

        Args:
            base_dir (str): The base directory that contains subdirectories for logs and results.
        """
        self.base_dir = base_dir
        self.config_log_dir = os.path.join(self.base_dir, "config")
        self.csv_file_path = os.path.join(self.base_dir, "results", "best_runs.csv")

        # Ensure necessary directories exist
        self.check_dir_exists(self.config_log_dir, create=True)
        self.check_dir_exists(os.path.dirname(self.csv_file_path), create=True)

    def check_dir_exists(self, dir_path, create=False):
        """Ensures that the provided directory path exists. Optionally creates it if necessary based on user input."""
        if dir_path and not os.path.exists(dir_path):
            if create:
                response = input(f"Directory '{dir_path}' does not exist. Do you want to create it? (y/n): ").strip().lower()
                if response == 'y':
                    # Case when it is okay to create directory, and choice is yes.
                    os.makedirs(dir_path, exist_ok=True)
                    print(f"Directory '{dir_path}' has been created.")
                else:
                    # Case when it is okay to create directory, but choice is no.
                    print(f"Directory '{dir_path}' was not created.")
                    return False
            else:
                # Case when it is not allowed to create a directory and none valid was given.
                raise FileNotFoundError(f"No directory found at {dir_path}")
        # Case when existing directory is provided.
        return True

    def get_target_dir_current_id(self, target_dir):
        """
        Returns the current available run ID by checking the run log directory.

        Returns:
            int: The current available run ID.
        """
        
        if self.check_dir_exists(target_dir, create=True):
        
            existing_files = os.listdir(target_dir)
            target_ids = []
            for f in existing_files:
                if f.endswith('.json'):
                    try:
                        target_id = int(f.split('_')[-1].replace('.json', ''))
                        target_ids.append(target_id)
                    except ValueError:
                        print(f"Skipping file with unexpected name format: {f}")

            return max(target_ids, default=0) + 1

    def get_target_dir_target_id(self, target_dir=None, target_id=None):
        if not target_dir and target_id:
            raise ValueError("Must provide directory and id, to fetch.")

        filename = f"{target_dir}_id_{target_id}"
        
        target_dir = os.path.join(self.base_dir, target_dir)

        self.check_dir_exists(target_dir, create=False)
        
        try:
            # Construct the filename
            filename = f"{target_dir}/{filename}.json"
            with open(filename, "r") as f:
                instructions_json = json.load(f)

        except FileNotFoundError:
            raise FileNotFoundError(f"No configuration file found at {filename}")
        except json.JSONDecodeError:
            raise ValueError(f"Failed to decode JSON in the file at {filename}")

        return instructions_json

class MLAuditer(MLAnalytics):
    """Subclass of MLAnalytics for signing and storing pipeline configurations."""

    def sign_and_save_config(self, pipeline_config):
        """
        Signs a pipeline configuration with a run_id and saves it as JSON.
        
        Args:
            pipeline_config (dict): The pipeline configuration to sign and save.
        
        Returns:
            str: Path to the saved JSON file.
        """
        signed_config = pipeline_config.copy()
        signed_config["config_id"] = self.get_target_dir_current_id(self.config_log_dir)  # Generate a config ID here

        json_file = os.path.join(self.config_log_dir, f"config_id_{signed_config['config_id']}.json")
        try:
            with open(json_file, "w") as f:
                json.dump(signed_config, f, indent=4)
            print(f"Signed pipeline configuration saved at: {json_file}")
        except IOError as e:
            print(f"Error saving configuration: {e}")
            raise

        return json_file


class MLLogger(MLAnalytics):
    """Subclass of MLAnalytics for logging and storing results."""

    def __init__(self, base_dir, instructions_json):
        """
        Initializes the logger with directory and instructions.
        
        Args:
            base_dir (str): Base directory for saving results.
            instructions_json (dict): The instructions containing configuration details.
        """
        super().__init__(base_dir)
        self.config_id = instructions_json.get("config_id")  # Retrieve config_id from instructions

    def results_to_csv(self, result_data):
        """
        Appends results to a CSV file, injecting the config_id and run_id.
        
        Args:
            result_data (list of dicts): The result data to append to the CSV.
        
        Returns:
            None
        """
        if not result_data:
            raise ValueError("No result data to save.")

        df = pd.DataFrame(result_data)
        df['config_id'] = self.config_id
        
        try:
            if not os.path.exists(self.csv_file_path):
                df.to_csv(self.csv_file_path, index=False)
            else:
                df.to_csv(self.csv_file_path, mode="a", header=False, index=False)
            print(f"Results saved to {self.csv_file_path}")
        except IOError as e:
            print(f"Error saving results to CSV: {e}")
            raise

# PipelineBuilder

Pipelinebuilder assists with the creation of pipelines.

In [4]:
class PipelineBuilder:
    """
    Base class for building machine learning pipelines.
    """
    def __init__(self, column_names=None, auditer=None):
        if self.__class__ is PipelineBuilder:
            raise TypeError("Cannot instantiate abstract class PipelineBuilder.")

        self.column_names = column_names or []  # Store column names if provided
        self._blueprints = {}  # Dictionary to hold pipeline instructions
                
        if not auditer:
            raise ValueError("Cannot run PipelineBuiler without an instance of auditer")

        self.auditer = auditer
    
    def _inspect_pipelines(self):
        """
        Display all pipelines stored in the _blueprints.
        """
        if not self._blueprints:
            print("No pipelines have been created yet.")
        else:
            print("Current Pipelines:")
            for name, steps in self._blueprints.items():
                print(f"Pipeline '{name}':")
                for idx, step in enumerate(steps['sections'], 1):
                    print(f"  {idx}. {step}")
    
        eligible_params = self._get_class_params(section_class)
        return True, section_class, eligible_params

    def _get_class_params(self, section_class):
        """
        Retrieves eligible parameters for a class using inspection.
        """
        params = inspect.signature(section_class).parameters
        return list(params.keys())
    
    def _retrieve_class_from_scope(self, section_name):
        """
        Retrieves the class object from global or local scope.
        """
        section_class = globals().get(section_name) or locals().get(section_name)
        if section_class is None:
            print(f"Error: '{section_name}' is not defined in the current scope. Try again.")
            return None
        if not callable(section_class):
            print(f"Error: '{section_name}' is not callable. Try again.")
            return None
        return section_class
        
    def _validate_section(self, section_name, is_predictor):
        """
        Validates if a given section can be added to the pipeline.
        """
        section_class = self._retrieve_class_from_scope(section_name)
        if not section_class:
            return False, None, []
    
        if is_predictor:
            valid, section_class  = self._validate_predictor(section_class)
        else:
            valid, section_class = self._validate_transformer(section_class)
 
        if not valid:
            return False, None, []
        else:
            valid_params = self._get_class_params(section_class)
            return valid, section_class, valid_params
            
    def _validate_predictor(self, section_class):
        """
        Validates if the given class is a predictor.
        """
        if not hasattr(section_class, 'predict') or \
           not any(base.__name__ in ['BaseEstimator', 'BaseEnsemble'] for base in inspect.getmro(section_class)):
            print(f"Error: '{section_class.__name__}' is not a valid predictor "
                  f"(no 'predict' method or not derived from 'BaseEstimator' or 'BaseEnsemble'). Try again.")
            return False, None
        return True, section_class

    def _validate_transformer(self, section_class):
        """
        Validates if the given class is a transformer.
        """
        if not hasattr(section_class, 'transform') or \
           'TransformerMixin' not in [base.__name__ for base in inspect.getmro(section_class)]:
            print(f"Error: '{section_class.__name__}' is not a valid transformer "
                  f"(no 'transform' method or not derived from 'TransformerMixin'). Try again.")
            return False, None
        return True, section_class

class IterPlumber(PipelineBuilder):
    """
    Interactive pipeline builder allowing step-by-step creation and management of multiple pipelines.
    """
    def __init__(self, column_names=None, auditer=None):
        if auditer is None:
            raise ValueError("IterPlumber requires an instance of auditer to be provided.")
        super().__init__(column_names=column_names, auditer=auditer)

        # Initialize available sections as an empty 
        self.available_sections = {
            'transformers': {}, 
            'predictors' : {}
        } # Tracks user-created components not yet in use
        
    def run_pipes(self):
        """
        Main method to manage the creation of pipeline instructions for multiple pipelines iteratively.
        """
        while True:
            user_input = input(
                """
    Welcome to IterPlumber! Please select an option:
    1. Create a new pipeline
    2. View current pipelines
    3. Finalize blueprints
    4. Cancel and exit
    Your choice: """.strip()).strip()
    
            if user_input == '1':
                # Initialize the current pipeline as an empty dictionary
                print("\nFollow the instructions to iteratively build a pipeline.")
                current_pipeline = self._build_pipeline()
    
                # If a pipeline was successfully built
                if current_pipeline:
                    pipeline_name = current_pipeline['name']
                    self._blueprints[pipeline_name] = current_pipeline
                    print(f"Pipeline '{pipeline_name}' saved successfully.")
                else:
                    print("Pipeline creation was cancelled or not completed.")
    
            elif user_input == '2':
                # View current pipelines
                if self._blueprints:
                    print("\nCurrent pipelines:")
                    for name, pipeline in self._blueprints.items():
                        print(f"- {name}: {pipeline}")
                else:
                    print("\nNo pipelines have been created yet.")
    
            elif user_input == '3':
                # Finalize and save the pipelines
                print("\nFinalizing and saving pipelines...")
                self._finalize_blueprints()
                print("Pipelines finalized successfully. Goodbye!")
                break
    
            elif user_input == '4':
                # Cancel and exit
                print("\nCancelling all operations. Goodbye!")
                break
    
            else:
                # Handle invalid input
                print("\nInvalid choice. Please select a valid option (1-4).")


    def _build_pipeline(self):
        """
        Helper method to interactively build pipeline instructions for a single pipeline.
        """
        self.current_column_names = {col: col for col in self.column_names}  # Keeps column mapping updated dynamically
        
        # Empty pipeline is instanced
        section_id = 0
        section_number = 0
        
        this_pipeline = {
            'name': ''
            , 'n_sections': 0
            , 'sections': {}
        }

        # Flow controls for pipeline assembly
        has_transformer = False
        has_predictor = False
    
        while True:
            print("\n--- Pipeline Creation Menu ---")
            print("To build a pipeline, add at least one transformer and one predictor section."
                  "The last section must always be a predictor. Sections must be built before they can be added.")
            print("\nOptions:")
            print("1. Build a transformer section")
            print("   - A transformer applies preprocessing steps to your data, such as scaling, encoding, or imputation.")
            print("2. Build a predictor")
            print("   - A predictor is the final step in the pipeline, such as a regression model or classifier.")
            print("3. Add a section to the pipeline")
            print("   - Use this to integrate a previously defined transformer or predictor into the pipeline.")
            print("4. View current pipeline")
            print("   - Displays the steps currently added to this pipeline, in the order they will be applied.")
            print("5. View current available sections")
            print("   - Displays the steps currently added to this pipeline, in the order they will be applied.")
            print("6. Finish and save this pipeline")
            print("   - Completes the pipeline creation process and saves the current pipeline.")
            print("7. Cancel this pipeline")
            print("   - Discards the current pipeline and returns to the main menu.")
            
            choice = input("Enter your choice: ").strip()
            
            if choice == '1':
                print("Building transformer section.")
                self._handle_section()
        
            elif choice == '2':
                print("Building predictor.")
                self._handle_section(is_predictor=True)
                
            elif choice == '3':
                if has_predictor:
                    print('Unable to add more sections, pipeline already has a predictor')
                    break
                
                if self.available_sections['transformers'] or self.available_sections['predictors']:
                    while True:
                        print("Adding section to the pipeline.")
                        print("Select which type of section you wish to add to the pipeline")
                        print("1. Add a transformer")
                        print("2. Add a predictor")
                        print("3. Add a column_transformer (requires a transformer)")
                        print("4. Go back to Pipeline creation menu.")
    
                        self._view_sections()
                        
                        choice = input("Enter your choice: ").strip()
                        if choice == '1':
                            if self.available_sections['transformers']:
                                has_transformer = self._add_section(this_pipeline, is_predictor=False)
                            else:
                                print('No transfomers available to add.')
                            
                        elif choice == '2':
                            if self.available_sections['predictors']:
                                has_predictor = self._add_section(this_pipeline, is_predictor=True)
                            else:
                                print('No predictors available to add.')
                        elif choice == '3':
                            if self.available_sections['transformers']:
                                has_transformer = self._add_section(this_pipeline, is_column_transformer=True)

                        elif choice == '4':
                            print('Returning to previous menu.')
                            break
                        else:
                            print('No transfomers available to add.')
                            
                else:
                    print("No pipes available to add.")
                
            elif choice == '4':
                self._view_pipeline(this_pipeline)
                
            elif choice == '5':
                self._view_sections()
                
            elif choice == '6':
                if has_predictor:
                    predictor_key = max(this_pipeline['sections'].keys())
                    this_pipeline['name'] = this_pipeline['sections'][predictor_key]['name']
                
                    print("Saving the current pipeline with predictor:", this_pipeline['name'] )
                    return this_pipeline
                    
                else:
                    print("Cannot store empty or incomplete pipeline blueprints. Please try again.")
                    
            elif choice == '7': 
                print("Cancelling the current pipeline and returning to the main menu.")
                input("Press any key to continue")
                return None
                
            else:
                print("Invalid choice. Please try again.")
            
            input("Press any key to continue")
        
                
    def _handle_section(self, is_predictor=False):
        """Handles flow into _build_section which itself provides an iterative procedure to generate pipelines, receives a section in return and parses it into the available_sections"""
        if is_predictor:
            string = "predictors"
        else:
            string = "transformers"
        
        section_name = input(f"Enter the {string} class name: ").strip()   

        if section_name.lower() in self.available_sections[string]:
            print(f'Failed to create {section_name}. Another section with that name already exists')
            valid = False
        else:
            valid, section_class, eligible_params = self._validate_section(section_name, is_predictor)
        
        if valid:
            section = self._build_section(section_name, section_class, eligible_params)

            self.available_sections[string][section['_name']] = {
                'name': section['_name'],
                'class': section['_class'],
                'args': section['_args'],
                'grid': section['_grid'],
                'columns' : section['_columns']
            }
            
        else:
            input(f"Failed to build {string} section. Press any key to return to the previous menu")
                
    def _build_section(self, section_name, section_class, eligible_params):
        """
        Prompts the user to provide values for eligible parameters of a section
        and validates them by instantiating the class and calling its `fit` method.
        """
        # Try to print documentation to assist the user
        try:
            print(section_class.__doc__)
        except Exception as e:
            print(f"\nUnexpected issue with docstring: {e}")
    
        print(f"Now building '{section_name.lower()}'. Ensure that parameter values are correct (Docstring above).")
    
        raw_params, params = {}, {}
        raw_grid_params, grid_params = {}, {}
         

        for param in eligible_params:
            user_input = input(f"Enter a value for '{param}' (or an iterable like a list or range for grid search if applicable): ")
            try:
                if user_input == '':
                    user_input = None
                else:
                    parsed_input = eval(user_input)
            except (ValueError, SyntaxError, NameError):
                parsed_input = user_input
        
            if not user_input:
                continue
            
            if isinstance(parsed_input, (dict, list, Iterable)) and not isinstance(parsed_input, str):
                raw_grid_params[param], grid_params[param] = user_input, parsed_input
            else:
                raw_params[param], params[param] = parsed_input, parsed_input
                
        if not grid_params:
                max_iterations = 1
                grid_iterators = {}
        else:
            grid_iterators = {param: iter(values) for param, values in grid_params.items()}
                 
        # Testing loop
        number_iterators = len(grid_params.keys())
        number_stop_iterators = 0
        current_args = params.copy()
        
        try:
            while number_stop_iterators != number_iterators:
                number_stop_iterators = 0
                for param, iterator in grid_iterators.items():
                    try:
                        current_args[param] = next(iterator)
                    except StopIteration:
                        number_stop_iterators += 1
                    
                print(f"Testing with arguments: {current_args}")
                dummy_data_x, dummy_data_y = [[0]], [[0]]  # Placeholder data
                section_instance = section_class(**current_args)
                try:
                    section_instance.fit(dummy_data_x)  # Validate the instance
                except Exception as e:
                    section_instance.fit(dummy_data_x, dummy_data_y)
        except Exception as e:
            print(f"Failed with arguments {current_args}: {e}")
            retry = input("Invalid arguments detected, try again? (y/n): ").strip().lower()
            if retry != 'y':
                print("Exiting pipeline creation.")
                return None
            else:
                # Retry the entire section
                return self._build_section(section_name, section_class, eligible_params)
    
        print(f"Successfully tested all arguments,")
        section = {
            '_name': section_class.__name__.lower(),
            '_class': section_class.__name__,
            '_args': raw_params,
            '_grid': raw_grid_params,
            '_columns': []
        }
    
        return section

    
    def _add_section(self, pipeline, is_predictor=False, is_column_transformer=False):
        """
        Add a section to the pipeline, either as a predictor or a transformer.
        Handles column selection for column transformers iteratively.
    
        Args:
            pipeline (dict): The current pipeline being built.
            is_predictor (bool): Whether the section is a predictor. Defaults to False.
            is_column_transformer (bool): Whether the section is a column transformer. Defaults to False.
    
        Returns:
            bool: True if at least one section was added, False otherwise.
        """
        section_type = 'predictors' if is_predictor else 'transformers'
        selectable_sections = self.available_sections[section_type].copy()
        selected_sections = {}
        
        # For column transformers, maintain a list of selectable columns
        selectable_columns = self.column_names.copy() if is_column_transformer else None
        has_more_to_add = True
        at_least_one_selected = False
        section_number = 0  # Internal section number (increments for column transformers)
    
        while has_more_to_add:
            # Step 1: Handle column selection if necessary
            selected_columns = None
            if is_column_transformer:
                selected_columns = self._select_columns(selectable_columns)
                if not selected_columns:
                    print("No columns selected. Returning to the previous menu.")
                    return False
                # Update the remaining selectable columns
                selectable_columns = [col for col in selectable_columns if col not in selected_columns]
    
            # Step 2: Handle section selection
            section_name, section_details = self._select_section(selectable_sections)
            if not section_name:
                print("No section selected. Returning to the previous menu.")
                return False
                        
            # Add columns if this is a column transformer
            if is_column_transformer:
                section_details['name'] = section_name
                section_details['columns'] = selected_columns
                print(f"Assigned columns {selected_columns} to {section_name}.")
                
            
            # Increment for each section selected
            section_number += 1  
            
            # Store the selected section
            selected_sections[section_number] = section_details
            selectable_sections.pop(section_name, None)  # Remove from available sections
            
            at_least_one_selected = True
    
            # Step 3: Decide whether to add more transformers to this column transformer
            if is_column_transformer and selectable_columns and selectable_sections:
                add_more = input("Add another transformer to the column transformer? (y/n): ").strip().lower()
                if add_more != 'y':
                    print("Finalizing current column transformer.")
                    has_more_to_add = False
            else:
                has_more_to_add = False  # No more sections or columns to add
    
        if at_least_one_selected:
            # Update the id before incrementing
            section_id = pipeline['n_sections'] + 1
                
            # Step 4: Add column transformer or section to the pipeline
            if is_column_transformer:
                # Create a composite name for the column transformer
                transformer_name = '_'.join(
                    ['column_transformer'] + [
                        selected_sections[section_number]['name'] for section_number in selected_sections
                    ]
                )
                column_transformer = {
                    'name': transformer_name,
                    'transformers': selected_sections
                }
                # Add the column transformer to the pipeline
                pipeline['sections'][section_id] = column_transformer

            else:
                # Add single predictor/transformer to the pipeline
                section_number = next(iter(selected_sections))  # Extract the single selected section
                pipeline['sections'][section_id] = selected_sections[section_number]
            
            # Remove selected sections from available_sections
            for section_number in selected_sections:
                del self.available_sections[section_type][selected_sections[section_number]['name']]

            # Increment the number of sections
            pipeline['n_sections'] += 1
        
            return True
            
        else:
            print("No sections were successfully added to the pipeline.")
            return False

    def _select_section(self, available_sections):
        while True:
            print(f"Available sections: \n{available_sections.keys()}")
            chosen_section = input("Select a transformer by entering its name (see above)")
            
            if chosen_section not in available_sections.keys():
                print("Failed to match your input with an available section")
                retry = input("Try again y/n?")
                if retry == 'y':
                    continue
                else:
                    print('Returning to the previous menu')
                    return None, None
            else:
                return chosen_section, available_sections[chosen_section]
                    
    def _select_columns(self, remaining_columns):
        """
        Prompt the user to select specific columns for the given transformer section using indices,
        ranges, or slices. Allows the user to cancel and return to the previous menu.
        """
        while True:
            # Display available columns with indices
            print("\nAvailable columns:")
            for idx, col in enumerate(remaining_columns, start=1):
                print(f"{idx}. {col}")
            
            print("\nEnter column indices, ranges (e.g., range(1, 5)), or slices (e.g., 1:5).")
            print("Type 'cancel' to return to the previous menu.")
            
            # Get user input
            user_input = input("Enter indices, ranges, or 'cancel': ").strip()
            
            # Handle cancel option
            if user_input.lower() == 'cancel':
                print("Returning to the previous menu.")
                return None
            
            try:
                # Try to evaluate the entire input as an iterable
                selected_indices = []
                try:
                    evaluated = eval(user_input)
                    if isinstance(evaluated, Iterable):
                        selected_indices.extend(list(evaluated))
                    else:
                        raise ValueError("Input is not an iterable.")
                except Exception:
                    # If not an iterable, split and process parts individually
                    inputs = [item.strip() for item in user_input.split(',')]
                    for item in inputs:
                        selected_indices.extend(list(eval(item)))
                
                # Deduplicate and validate indices
                selected_indices = sorted(set(selected_indices))  # Remove duplicates
                if not all(1 <= idx <= len(remaining_columns) for idx in selected_indices):
                    print(f"Invalid indices. Please ensure values are between 1 and {len(remaining_columns)}.")
                    continue
                
                # Translate indices to column names
                selected_columns = [remaining_columns[idx - 1] for idx in selected_indices]
                
                # Return the selected columns
                return selected_columns
            except Exception as e:
                print(f"Invalid input: {e}. Please enter valid indices, ranges, or slices.")
    
    def _view_sections(self):
        for section_type in self.available_sections.items():
            if not section_type:
                print(f"No sections available for type {section_type}")
            else:
                for section in section_type:
                    print(section),
                print()  
                
    def _view_pipeline(self, this_pipeline):
        if not this_pipeline:
            print(" No sections in use/available.")
        else:
            print("Current pipeline:")
            for step_name, step_details in this_pipeline['sections'].items():
                print(f"- {step_name}: {step_details}")

    def _finalize_blueprints(self):
        """
        Finalize all blueprints by saving them using an external auditor.
        """
        if not self._blueprints:
            print("No pipelines were created. Nothing to save.")
            return
    
        print("\nFinalizing and saving all created pipelines...")
        # Call auditer to sign the blueprint to the working directory
        self.auditer.sign_and_save_config(self._blueprints)
        print("Pipelines saved successfully.")

# Fitter

Trains models using instructions

In [5]:
class Fitter:
    
    def __init__(self, X, y, instructions_json, logger):
        
        if not instructions_json or not logger:
            raise ValueError("Both 'instructions_json' and 'logger' must be provided.")
        
        self.X = X
        self.y = y
        self.ml_logger = logger
        
        # Parse blueprints into pipelines and grids
        self.instructions = self.lay_pipeline(instructions_json.copy())

        print(self.instructions)
        
        self.outer_cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
        self.inner_cv = StratifiedKFold(n_splits=3, shuffle=True, random_state=42)
        self.results_buffer = []
        
    def lay_pipeline(self, json_blueprints):
        pipelines_and_grids = []
        column_mapping = {col: col for col in X_train.columns}  # Initial mapping
        
        for blueprint_name, blueprint_data in json_blueprints.items():
            if blueprint_name == 'config_id':
                continue
            
            sections = blueprint_data['sections']
            steps = []
            grid_params = {}
    
            for section_id, section_info in sections.items():
                print(section_id)
                if 'transformers' in section_info:  # Column Transformer
                    column_transformer_steps = []
                    for transformer in section_info['transformers'].values():
                        # Evaluate arguments for the transformer
                        for key, value in transformer['args'].items():
                            try:
                                evaluated_value = eval(value)
                                if isinstance(evaluated_value, object):
                                    transformer['args'][key] = evaluated_value
                            except:
                                pass  # If evaluation fails, retain the original value
                        
                        # Replace column names with mapped names
                        transformer_columns = [column_mapping.get(col, col) for col in transformer['columns']]
    
                        # Transformer step details
                        transformer_name = transformer['name']
                        transformer_class = eval(transformer['class'])(**transformer['args'])
    
                        # Add transformer to ColumnTransformer steps
                        column_transformer_steps.append((transformer_name, transformer_class, transformer_columns))
    
                        # Extract grid parameters for the transformer
                        for param_key, param_values in transformer['grid'].items():
                            grid_key = f"{section_info['name']}__{transformer_name}__{param_key}"
                            grid_params[grid_key] = eval(param_values)
    
                    # Add the ColumnTransformer to the pipeline and ensure pandas output
                    column_transformer = ColumnTransformer(column_transformer_steps, remainder='passthrough')
                    column_transformer.set_output(transform="pandas")
                    
                    # Add the fitted column transformer to the pipeline
                    steps.append((section_info['name'], column_transformer))
                    
                else:  # Model/Predictor
                    # Evaluate arguments for the model
                    for key, value in section_info['args'].items():
                        try:
                            evaluated_value = eval(value)
                            if isinstance(evaluated_value, object):
                                section_info['args'][key] = evaluated_value
                        except:
                            pass  # If evaluation fails, retain the original value
    
                    step_name = section_info['name']
                    model_instance = eval(section_info['class'])(**section_info['args'])
                    steps.append((step_name, model_instance))
    
                    # Extract grid parameters for the model
                    for param_key, param_values in section_info['grid'].items():
                        grid_key = f"{step_name}__{param_key}"
                        grid_params[grid_key] = eval(param_values)
    
            # Create the pipeline and ensure pandas output
            pipeline = Pipeline(steps)
            pipeline.set_output(transform="pandas")
            pipelines_and_grids.append((blueprint_name, pipeline, grid_params))
    
        return pipelines_and_grids

    def process_and_store_results(self, model_name, df_results, best_fold_index):
        """
        Processes cross-validation results and stores them in a structured format.

        Args:
            model_name (str): Name of the model.
            df_results (dict): Cross-validation results.
            best_fold_index (int): Index of the best scoring fold.

        Returns:
            dict: Processed results for the model.
        """
        best_fold_estimator = df_results['estimator'][best_fold_index]
        best_score = df_results['test_score'][best_fold_index]
        best_params = best_fold_estimator.best_params_
        best_fold_estimator.fit(self.X, self.y)
        refit_train_score = best_fold_estimator.score(self.X, self.y)
        
        mean_train_score = df_results['train_score'].mean()
        mean_test_score = df_results['test_score'].mean()
        fit_time = df_results['fit_time'][best_fold_index]
        score_time = df_results['score_time'][best_fold_index]

        model_result = {
            "model": model_name,
            "best_hyperparameters": best_params,
            "best_test_score": best_score,
            "mean_train_score": mean_train_score,
            "mean_test_score": mean_test_score,
            "fit_time": fit_time,
            "score_time": score_time,
            "refit_train_score": refit_train_score,
        }

        self.results_buffer.append(model_result)

    def run(self):
        """
        Executes the fitting process for all instructions and handles results storage.
        """
        try:
            for model_name, pipeline, params in self.instructions:     
                grid_search = GridSearchCV(
                    estimator=pipeline,
                    param_grid=params,
                    scoring='f1_macro',
                    cv=self.inner_cv,
                    n_jobs=1,
                    refit=True,
                    verbose=2         
                )
    
                cv_results = cross_validate(
                    estimator=grid_search,
                    X=self.X,
                    y=self.y,
                    cv=self.outer_cv,
                    return_train_score=True,
                    return_estimator=True,
                    scoring="f1_macro",
                    n_jobs=1,
                    verbose=2
                )
    
                best_fold_index = cv_results['test_score'].argmax()
                self.process_and_store_results(model_name, cv_results, best_fold_index)
        
        finally:
            self.ml_logger.results_to_csv(self.results_buffer)
            print("All results saved by the logger.")

# Run

In [6]:
# Defines the base directory for the process
base_dir = os.getcwd()

## Ingestion

Use the cell below to load a data file.

In [7]:
# Data is ingested from the working directory, with the index set to col_0 and dtypes applied directly
with open('../data/dtypes.json', 'r') as file:
    train = pd.read_csv('../data/preproc_train.csv', index_col=0, dtype=json.load(file), low_memory=True)

# Data is ingested from the working directory, with the index set to col_0 and dtypes applied directly
with open('../data/dtypes.json', 'r') as file:
    test = pd.read_csv('../data/preproc_test.csv', index_col=0, dtype=json.load(file), low_memory=True)

# Convert all NaN values to np.nan
train = train.fillna(np.nan)

X = train.drop(columns=['claim_injury_type']).copy()
y = train['claim_injury_type']

# Drop missing from y
y = y.dropna()

# Align X and y based on y's indices
X = X.loc[y.index]

# Now select numeric columns for X, and sample 20k for ease of testing
X = X.sample(n=20000)

# Slice y on the selected X indices
y = y[X.index]

In [8]:
with_missing_features = [
    'accident_date',
    'age_at_injury',
    'average_weekly_wage',
    'birth_year',
    'c_2_date',
    'c_3_date',
    'first_hearing_date',
    'gender',
    'ime_4_count',
    'industry_code',
    'industry_code_description',
    'wcio_cause_of_injury_code',
    'wcio_cause_of_injury_description',
    'wcio_nature_of_injury_code',
    'wcio_nature_of_injury_description',
    'wcio_part_of_body_code',
    'wcio_part_of_body_description'
]

metric_features = [
    'dd_asb_c2',
    'dd_asb_c3',
    'dd_c2_c3',
    'age_at_injury', 
    'ime_4_count', 
    'average_weekly_wage', 
    'birth_year',
    'dependants',
    'first_hearing_date_day',
    'first_hearing_date_month',
    'first_hearing_date_year',
    'c_2_date_day',
    'c_2_date_month',
    'c_2_date_year',
    'c_3_date_day',
    'c_3_date_month',
    'c_3_date_year',
    'assembly_date_day',
    'assembly_date_month',
    'assembly_date_year',
    'accident_date_day',
    'accident_date_month',
    'accident_date_year',
]

binary_features = [
    'age_at_injury_zero',
    'is_unionized',
    'alternative_dispute_resolution',
    'attorney_representative',
    'covid_19_indicator',
    'do_1',
    'do_10',
    'do_11',
    'do_12',
    'do_13',
    'do_14',
    'do_15',
    'do_16',
    'do_2',
    'do_3',
    'do_4',
    'do_5',
    'do_6',
    'do_7',
    'do_8',
    'do_9',
]

categorical_features = [
    'carrier_name',
    'carrier_type',
    'county_of_injury',
    'district_name',
    'industry_code',
    'industry_code_description',
    'medical_fee_region',
    'wcio_cause_of_injury_code',
    'wcio_cause_of_injury_description',
    'wcio_nature_of_injury_code',
    'wcio_nature_of_injury_description',
    'wcio_part_of_body_code',
    'wcio_part_of_body_description',
    'zip_code',
    'cause_of_injury_group'
]

In [9]:
X.select_dtypes(include='category').columns.tolist()

['carrier_type',
 'county_of_injury',
 'medical_fee_region',
 'cause_of_injury_group',
 'part_of_body_group']

In [10]:
print(X.isna().sum().sum())  # Total count of NaNs in X
print(y.isna().sum())  # Total count of NaNs in y

3727
0


In [11]:
train.info()

<class 'pandas.core.frame.DataFrame'>
Index: 572325 entries, 5393875 to 6165075
Data columns (total 90 columns):
 #   Column                                     Non-Null Count   Dtype   
---  ------                                     --------------   -----   
 0   age_at_injury                              566814 non-null  float64 
 1   alternative_dispute_resolution             572325 non-null  float64 
 2   attorney_representative                    572325 non-null  float64 
 3   average_weekly_wage                        543741 non-null  float64 
 4   birth_year                                 543333 non-null  float64 
 5   carrier_type                               572325 non-null  category
 6   claim_injury_type                          572325 non-null  category
 7   county_of_injury                           572325 non-null  category
 8   covid_19_indicator                         572325 non-null  float64 
 9   gender                                     567575 non-null  float64 

## Generate instructions

Import all necessary libraries here.

In [12]:
# Transformer imports
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import MinMaxScaler
from sklearn.preprocessing import OneHotEncoder
from sklearn.linear_model import LogisticRegression

# Recursive Feature Selection
from sklearn.feature_selection import RFE

# Predictor imports
from sklearn.neighbors import KNeighborsClassifier
from sklearn.ensemble import RandomForestClassifier

In [13]:
# Creates an instance of the Auditer
auditer = MLAuditer(base_dir=base_dir)

## Fit data

Consumes instructions, as pipelines, via gridsearch, cross-validation.

In [14]:
# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)

# Auditer passes a valid instruction set 
instructions = auditer.get_target_dir_target_id(target_dir="config", target_id="2")

# Creates an instance of the Logger
logger = MLLogger(base_dir=base_dir, instructions_json=instructions)

# Create the Fitter instance
fitter = Fitter(X_train, y_train, instructions_json=instructions, logger=logger)

# Run the fitting process
fitter.run()

1
2
3
4
5
6
[('randomforestclassifier', Pipeline(steps=[('column_transformer_simpleimputer',
                 ColumnTransformer(remainder='passthrough',
                                   transformers=[('simpleimputer',
                                                  SimpleImputer(strategy='most_frequent'),
                                                  ['carrier_type',
                                                   'county_of_injury',
                                                   'part_of_body_group',
                                                   'cause_of_injury_group'])])),
                ('column_transformer_onehotencoder',
                 ColumnTransformer(remainder='passthrough',
                                   transformers=[('onehot...
                                                   'c_2_date_month',
                                                   'c_2_date_day',
                                                   'c_3_date_year',
                   



[CV] END .................................................... total time=   0.0s
[CV] END .................................................... total time=   0.0s
[CV] END .................................................... total time=   0.1s
Fitting 3 folds for each of 1 candidates, totalling 3 fits
[CV] END .................................................... total time=   0.0s
[CV] END .................................................... total time=   0.0s
[CV] END .................................................... total time=   0.0s
[CV] END .................................................... total time=   0.1s


ValueError: No result data to save.

Fin