In [None]:
import os

data_prep_dir = "./prep_src"
os.makedirs(modules_dir, exist_ok=True

In [None]:
%%writefile {data_prep_dir}/data_prep_functions.py
            
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.impute import SimpleImputer
from sklearn.experimental import enable_iterative_imputer
from sklearn.impute import IterativeImputer
import logging
import os
import mlflow

# Configure logging
logging.basicConfig(level=logging.INFO)
log = logging.getLogger(__name__)


import matplotlib.pyplot as plt
import seaborn as sns

# Directory to save the plots
plot_save_dir = 'plots'

def create_directory_if_not_exists(directory):
    if not os.path.exists(directory):
        os.makedirs(directory)

def check_missing_values(df, artifact_save_dir='artefacts'):
    """
    Check and visualize missing values in a DataFrame.

    Parameters:
        df (pd.DataFrame): The DataFrame to check for missing values.
        artifact_save_dir (str): Directory to save the heatmap plot and other artifacts.

    Returns:
        None
    """
    log = logging.getLogger(__name__)

    # Check for missing values and compute the count of missing values in each column
    missing_values = df.isnull().sum()

    # Plot a heatmap of missing values
    plt.figure(figsize=(12, 6))
    sns.heatmap(df.isnull(), cbar=False, cmap='viridis')
    plt.title('Missing Values Heatmap')
    plt.xlabel('Columns')
    plt.ylabel('Rows')

    # List the number of missing values in each column
    log.info("Number of missing values in each column:")
    for column, count in missing_values.items():
        if count > 0:
            log.info(f"Column '{column}' had {count} missing values.")

    # Create the artifact_save_dir directory if it doesn't exist
    if not os.path.exists(artifact_save_dir):
        os.makedirs(artifact_save_dir)

    # Save the heatmap plot as an image in the specified directory
    plot_name = 'missing_values_heatmap'
    plot_save_path = os.path.join(artifact_save_dir, f"{plot_name}.png")
    plt.savefig(plot_save_path)
    log.info(f'{plot_name} saved at: {plot_save_path}')

    # Log the heatmap plot as an artifact using MLflow
    mlflow.log_artifact(plot_save_path, artifact_path=f'{plot_name}.png')

    plt.show()

    

def replace_missing_values(df, ms_threshold: int, artifact_save_dir='artefacts'):
    """
    Replace missing values in a DataFrame using interpolation and iterative imputation.

    Parameters:
        df (pd.DataFrame): The DataFrame containing missing values.
        ms_threshold (int): Threshold to switch between interpolation and iterative imputer.
        artifact_save_dir (str, optional): Directory to save artifacts (e.g., logs) (default: None).

    Returns:
        pd.DataFrame: DataFrame with missing values replaced.
    """
    # Create a logger
    log = logging.getLogger(__name__)

    # If an artifact_save_dir is specified, configure the logger to save logs to that directory
    if artifact_save_dir:
        log_filename = 'replace_missing_values.log'
        log_filepath = os.path.join(artifact_save_dir, log_filename)

        # Configure the logger
        logging.basicConfig(filename=log_filepath, level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')

    # Threshold to switch between interpolation and iterative imputer
    interpolation_threshold = ms_threshold

    # Count the missing values in each column
    missing_values = df.isnull().sum()

    # List to store column names that need imputation
    columns_to_impute = []

    # Identify columns where the gap between missing values is less than the threshold
    for column, count in missing_values.items():
        if count > 0:
            indices = df[column].index[df[column].isnull()]
            differences = np.diff(indices)
            if all(diff <= interpolation_threshold for diff in differences):
                columns_to_impute.append(column)

    # Separate columns for interpolation and iterative imputer
    columns_to_interpolate = [col for col in columns_to_impute if col not in columns_to_impute]
    columns_to_iterative_impute = [col for col in columns_to_impute if col in columns_to_impute]

    # Replace missing values with interpolation
    if len(columns_to_interpolate) > 0:
        imputer = SimpleImputer(strategy='nearest')
        df[columns_to_interpolate] = imputer.fit_transform(df[columns_to_interpolate])
        for column in columns_to_interpolate:
            log.info(f"Imputed '{column}' using 'nearest' strategy.")

    # Replace missing values with iterative imputer
    if len(columns_to_iterative_impute) > 0:
        imputer = IterativeImputer()
        df[columns_to_iterative_impute] = imputer.fit_transform(df[columns_to_iterative_impute])
        for column in columns_to_iterative_impute:
            log.info(f"Imputed '{column}' using 'iterative' strategy.")

    return df

    
def drop_highly_correlated_features(df, corr_threshold=0.8, plot_heatmaps=True, artifact_save_dir='artefacts'):
    """
    Perform feature selection based on Spearman correlation coefficient.

    Parameters:
    - df: pandas DataFrame containing the dataset.
    - corr_threshold: The threshold for correlation above which features will be dropped (default is 0.8).
    - plot_heatmaps: Whether to plot heatmaps before and after dropping (default is True).
    - artifact_save_dir: Directory to save the correlation heatmap plots (default is None).

    Returns:
    - A DataFrame with the highly correlated features dropped.
    """
    # Create a logger
    log = logging.getLogger(__name__)

    if artifact_save_dir and not os.path.exists(artifact_save_dir):
        os.makedirs(artifact_save_dir)
    
    # Calculate the correlation matrix (Spearman by default in pandas)
    corr_matrix = df.corr(method='spearman')
    
    if plot_heatmaps:
        # Plot the correlation heatmap before dropping
        fig_before = plt.figure(figsize=(8, 6))
        plt.title("Correlation Heatmap (Before Dropping)")
        sns_plot_before = sns.heatmap(corr_matrix, annot=True, cmap='coolwarm', fmt=".2f")
        
        # Save the plot as an image file
        if artifact_save_dir:
            before_plot_path = os.path.join(artifact_save_dir, "correlation_heatmap_before.png")
            plt.savefig(before_plot_path)
            log.info("Correlation heatmap (Before Dropping): %s", before_plot_path)
            
        mlflow.log_artifact(before_plot_path, artifact_path="correlation_heatmap_before.png")
        plt.show()
    
    # Create a set to store the columns to drop
    columns_to_drop = set()
    
    # Create a list to store the names of the dropped columns
    dropped_columns = []
    
    # Iterate through the columns and identify highly correlated features
    for col1 in corr_matrix.columns:
        for col2 in corr_matrix.columns:
            if col1 != col2 and abs(corr_matrix.loc[col1, col2]) >= corr_threshold:
                # Check if col1 or col2 should be dropped based on their mean correlation
                mean_corr_col1 = corr_matrix.loc[col1, :].drop(col1).abs().mean()
                mean_corr_col2 = corr_matrix.loc[col2, :].drop(col2).abs().mean()
                
                if mean_corr_col1 > mean_corr_col2:
                    columns_to_drop.add(col1)
                    dropped_columns.append(col1)
                else:
                    columns_to_drop.add(col2)
                    dropped_columns.append(col2)
    
    # Drop the highly correlated features from the DataFrame
    df = df.drop(columns=columns_to_drop)
    
    if plot_heatmaps:
        # Calculate the correlation matrix after dropping
        corr_matrix_after_drop = df.corr(method='spearman')
        
        # Plot the correlation heatmap after dropping
        fig_after = plt.figure(figsize=(8, 6))
        plt.title("Correlation Heatmap (After Dropping)")
        sns_plot_after = sns.heatmap(corr_matrix_after_drop, annot=True, cmap='coolwarm', fmt=".2f")
        
        # Save the plot as an image file
        if artifact_save_dir:
            after_plot_path = os.path.join(artifact_save_dir, "correlation_heatmap_after.png")
            plt.savefig(after_plot_path)
            log.info("Correlation heatmap (After Dropping): %s", after_plot_path)
            
        mlflow.log_artifact(after_plot_path, artifact_path="correlation_heatmap_after.png")
        plt.show()
           
    # Log the names of the dropped columns
    log.info("Dropped columns: %s", dropped_columns)

    return df

def drop_high_cardinality_features(df, max_unique_threshold=0.9):
    """
    Drop high cardinality features (columns) from a DataFrame based on a threshold.

    Parameters:
        df (pd.DataFrame): The input DataFrame.
        max_unique_threshold (float): The maximum allowed fraction of unique values in a column (default is 0.9).

    Returns:
        pd.DataFrame: The DataFrame with high cardinality columns dropped.
    """
    if df is None:
        raise ValueError("Input DataFrame 'df' cannot be None.")
        
    # Calculate the maximum number of allowed unique values for each column
    max_unique_values = len(df) * max_unique_threshold
    
    # Identify and drop columns with unique values exceeding the threshold
    high_cardinality_columns = [col for col in df.columns if df[col].nunique() > max_unique_values]
    
    # Log the names of the dropped columns using MLflow
    if high_cardinality_columns:
        mlflow.log_param("HighCardinalityColumns", ', '.join(high_cardinality_columns))
    
    df_dropped = df.drop(columns=high_cardinality_columns)
    
    return df_dropped

def select_categorical_columns(data):
    """
    Select categorical columns from a DataFrame.

    Parameters:
    - data: pandas DataFrame containing the dataset.

    Returns:
    - A list of column names that are categorical.
    """
    categorical_columns = data.select_dtypes(include=['object', 'category']).columns.tolist()
    return categorical_columns

def custom_train_test_split(data, target_column, test_size=0.2, random_state=101, time_series=False):
    """
    Split the dataset into training and testing sets.

    Parameters:
    - data: pandas DataFrame containing the dataset.
    - target_column: Name of the target column.
    - test_size: Proportion of the dataset to include in the test split (default is 0.2).
    - random_state: Seed for random number generation (optional).
    - time_series: Set to True if the data is time series data (default is False).

    Returns:
    - X_train, X_test, y_train, y_test: The split datasets.
    """
    if time_series:
        # For time series data, split by a specific time point
        data = data.sort_index()  # Sort by time index if not already sorted
        n = len(data)
        split_index = int((1 - test_size) * n)
        X_train, X_test = data.iloc[:split_index, :-1], data.iloc[split_index:, :-1]
        y_train, y_test = data.iloc[:split_index][target_column], data.iloc[split_index:][target_column]
    else:
        # For regular (cross-sectional) data, use train_test_split
        X = data.drop(columns=[target_column])
        y = data[target_column]
        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=test_size, random_state=random_state)

    return X_train, X_test, y_train, y_test



if __name__ == "__main__":
    main()


In [None]:
import os

data_prep_dir = "./train_src"
os.makedirs(modules_dir, exist_ok=True

In [None]:
%%writefile {train_data_dir}/training_functions.py
%%writefile {modules_dir}/tune_train_test.py
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import OneHotEncoder
from sklearn.experimental import enable_hist_gradient_boosting
from sklearn.ensemble import HistGradientBoostingClassifier
from sklearn.pipeline import Pipeline
from sklearn.model_selection import GridSearchCV, RandomizedSearchCV, TimeSeriesSplit
from sklearn.metrics import accuracy_score
from skopt import BayesSearchCV
from skopt.space import Real, Categorical, Integer
from sklearn.model_selection import train_test_split
import numpy as np
import pandas as pd
import logging
import os
import json
import joblib

# Configure logging
logging.basicConfig(level=logging.INFO)
log = logging.getLogger(__name__)



def custom_train_test_split(data, target_column, test_size=0.2, random_state=101, time_series=False):
    """
    Split the dataset into training and testing sets.

    Parameters:
    - data: pandas DataFrame containing the dataset.
    - target_column: Name of the target column.
    - test_size: Proportion of the dataset to include in the test split (default is 0.2).
    - random_state: Seed for random number generation (optional).
    - time_series: Set to True if the data is time series data (default is False).

    Returns:
    - X_train, X_test, y_train, y_test: The split datasets.
    """
    if time_series:
        # For time series data, split by a specific time point
        data = data.sort_index()  # Sort by time index if not already sorted
        n = len(data)
        split_index = int((1 - test_size) * n)
        X_train, X_test = data.iloc[:split_index, :-1], data.iloc[split_index:, :-1]
        y_train, y_test = data.iloc[:split_index][target_column], data.iloc[split_index:][target_column]
    else:
        # For regular (cross-sectional) data, use train_test_split
        X = data.drop(columns=[target_column])
        y = data[target_column]
        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=test_size, random_state=random_state)

    return X_train, X_test, y_train, y_test



def hyperparameter_tuning(X_train, y_train, model_prefix:str, param_grid=None, random_search=False, bayesian_search=False, n_iter=10, random_seed=101):
    """
    Train a Histogram Gradient Boosting Classifier and tune its hyperparameters.

    Parameters:
    - X_train, y_train: Training data and labels.
    - X_test, y_test: Testing data and labels.
    - model_prefix: Prefix for model artifacts.
    - param_grid: Hyperparameter grid to search (default is None).
    - random_search: Whether to use random search instead of grid search (default is False).
    - bayesian_search: Whether to use Bayesian hyperparameter search (default is False).
    - n_iter: Number of parameter settings that are sampled (only for random_search or bayesian_search).

    Returns:
    - Trained model, best hyperparameters, and test accuracy.
    """
    # Identify categorical columns
    categorical_features = list(X_train.select_dtypes(include=['category', 'object']).columns)
    # Create a ColumnTransformer to apply one-hot encoding to categorical columns
    preprocessor = ColumnTransformer(
        transformers=[
            ('cat', OneHotEncoder(), categorical_features)
        ],
        remainder='passthrough'  # Keep non-categorical columns as-is
    )
    # Create a Histogram Gradient Boosting Classifier
    clf = HistGradientBoostingClassifier(random_state=42)
    
    # Combine preprocessing and classifier into a single pipeline
    pipeline = Pipeline([
        ('preprocessor', preprocessor),
        ('classifier', clf)
    ])

    if not bayesian_search:
        # Define hyperparameters for grid search or random search
        hyperparameters = {
            'classifier__max_iter': [100, 200, 300],  # Adjust the values as needed
            'classifier__learning_rate': [0.001, 0.01, 0.1],  # Adjust the values as needed
            'classifier__max_depth': [3, 4, 5],  # Adjust the values as needed
            'classifier__l2_regularization': [0.0, 0.1, 0.2]  # Adjust the values as needed
        }

        if random_search:
            # Use RandomizedSearchCV
            search = RandomizedSearchCV(pipeline, param_distributions=hyperparameters, n_iter=n_iter, scoring='accuracy', n_jobs=-1, random_state=random_seed)
        else:
            # Use GridSearchCV
            search = GridSearchCV(pipeline, param_grid=hyperparameters, scoring='accuracy', n_jobs=-1, random_state=random_seed)
    else:
        # Use Bayesian hyperparameter search with BayesSearchCV
        param_grid = {
            'classifier__max_iter': (100, 300),
            'classifier__learning_rate': (0.001, 0.1),
            'classifier__max_depth': (3, 5),
            'classifier__l2_regularization': (0.0, 0.2)
        }

        search = BayesSearchCV(pipeline, param_grid, n_iter=n_iter, cv=TimeSeriesSplit(n_splits=3), scoring='accuracy', n_jobs=-1, random_state=random_seed)

    # Fit the search to the training data
    search.fit(X_train, y_train)

    # Get the best hyperparameters and the best estimator (trained model)
    best_params = search.best_params_
    best_estimator = search.best_estimator_
    
    log.info('Parameters chosen are:')
    log.info(best_params)
    
    log.info('The best estimator is:')
    log.info(best_estimator)
    
    # Evaluate the best model on the test data
   # y_pred = best_estimator.predict(X_test)
   # test_accuracy = accuracy_score(y_test, y_pred)
   # log.info(f'Test Accuracy: {test_accuracy:.2f}')
    
    # Save the best model to a file
    model_filename = f'{model_prefix}_best_model.joblib'
    joblib.dump(best_estimator, model_filename)
    
    # Save best hyperparameters to a JSON file
    hyperparameters_filename = f'{model_prefix}_hyperparameters.json'
    log.info(f'Saving best hyperparameters for {model_prefix} as {hyperparameters_filename}')
    with open(hyperparameters_filename, 'w') as f:
        json.dump(best_params, f)
        
    return best_params, hyperparameters_filename

def train_model(X_train, y_train, model_name:str, hyperparam: dict=None, hyperparam_filename: str=None):
    # Identify categorical columns
    categorical_features = list(X_train.select_dtypes(include=['category', 'object']).columns)
    # Create a ColumnTransformer to apply one-hot encoding to categorical columns
    preprocessor = ColumnTransformer(
        transformers=[
            ('cat', OneHotEncoder(), categorical_features)
        ],
        remainder='passthrough'  # Keep non-categorical columns as-is
    )
    X_train_transformed = preprocessor.fit_transform(X_train)
    if hyperparam_filename is not None:
        log.info(f'Loading in hyperparameters: {hyperparam_filename}')
        with open(hyperparam_filename, 'r') as f:
            best_params = json.load(f)
    elif hyperparam is not None:
        best_params = hyperparam
    else:
        raise ValueError('Either hyperparam or hyperparam_filename must be assigned')
    
    # Create and train the model with the specified hyperparameters
    log.info('Training Model')
    trained_model = HistGradientBoostingClassifier(class_weight='balanced',
        max_iter=best_params['classifier__max_iter'],
        learning_rate=best_params['classifier__learning_rate'],
        max_depth=best_params['classifier__max_depth'],
        l2_regularization=best_params['classifier__l2_regularization'],
        random_state=10
    )
    trained_model.fit(X_train_transformed, y_train)
    
    # Save the trained model to a file
    log.info(f'Saving {model_name}')
    joblib.dump(trained_model, model_name)
    
    return trained_model

def predict_model(trained_model, X_test, inference_col_name):
    """
    Predict using a trained machine learning model.

    Parameters:
    - trained_model: The trained machine learning model.
    - X_test: The test dataset on which to make predictions.
    - inference_col_name: The name of the column to store predictions in the inference DataFrame.

    Returns:
    - inference_df: The DataFrame containing predictions.
    - inference_col_name: The name of the column where predictions are stored.
    - predictions: The predictions made by the model.
    """
    from sklearn.compose import ColumnTransformer
    from sklearn.preprocessing import OneHotEncoder
    import pandas as pd
    
    # Identify categorical columns
    categorical_features = list(X_test.select_dtypes(include=['category', 'object']).columns)
    
    # Create a ColumnTransformer to apply one-hot encoding to categorical columns
    preprocessor = ColumnTransformer(
        transformers=[
            ('cat', OneHotEncoder(), categorical_features)
        ],
        remainder='passthrough'  # Keep non-categorical columns as-is
    )
    
    # Fit and transform the test data
    X_test_transformed = preprocessor.fit_transform(X_test)
    
    # Get the one-hot encoded feature names
    ohe = preprocessor.named_transformers_['cat']
    cat_feature_names = list(ohe.get_feature_names_out(input_features=categorical_features))
    
    # Combine the one-hot encoded feature names and non-categorical column names
    all_column_names = cat_feature_names + list(X_test.select_dtypes(exclude=['category', 'object']).columns)
    
    # Convert X_test_transformed to a DataFrame with appropriate column names
    inference_df = pd.DataFrame(X_test_transformed, columns=all_column_names)
    
    # Make predictions using the trained model
    predictions = trained_model.predict(X_test_transformed)
    
    # Add predictions to the DataFrame with the specified column name
    inference_df[inference_col_name] = predictions
    
    return inference_df, inference_col_name, predictions


if __name__ == "__main__":
    main()

In [None]:
%%writefile training_functions.py
# <component>
$schema: https://azuremlschemas.azureedge.net/latest/commandComponent.schema.json
name: train_HGboost_classification_model
display_name: HistGradientBoostingModel
version: 1
type: command
inputs:
  training_data: 
    type: uri_folder
outputs:
  model_output:
    type: mlflow_model
  test_data:
    type: uri_folder
code: ./train_src
environment: azureml:general_environment:0.4.0
command: >-
  python train.py 
  --training_data ${{inputs.training_data}} 
  --test_data ${{outputs.test_data}} 
  --model_output ${{outputs.model_output}}
# </component>