#### Mount Correct Folder

In [None]:
%cd #INSERT FOLDER PATH

#### Import Modules

In [None]:
from modules.model_training import Model_Trainer
import pandas as pd

#### Setup & Run Training Pipeline

In [None]:
feature_columns = ['ProductCode', 'Code', 'Previous_ProductCode', 'OrderQuantity'] ### ORDER INFOS
feature_columns += ['FS_Breite', 'FS_Länge', 'FS_Tiefe', 'PBL_Breite', 'PBL_Länge', 'Tuben_Durchmesser', 'CALC_PACKGROESSE', 'Tuben_Länge', 'CALC_WIRKSTOFF', 'CALC_ALUFOLIE'] # PRODUCT INFOS
feature_columns += ['10th_Percentile_Auftragswechsel', '10th_Percentile_Primär', '10th_Percentile_Sekundär'] # HISTORIC PRODUCT CHANGE INFOS

In [None]:
def extract_error_summary_by_experiment(result_dicts):
    # Initialize a list to hold the aggregated data per experiment
    summary_rows = []
    
    # Loop through each experiment (result dictionary)
    for i, result_model_performance in enumerate(result_dicts, 1):
        # Create a dictionary to store RMSEs by target type
        target_rmse = {}
        
        for result in result_model_performance:
            target = result['target']
            val_rmse = result['val_RMSE']
            
            # Collect val_RMSE for each target
            if target not in target_rmse:
                target_rmse[target] = []
            target_rmse[target].append(val_rmse)
        
        # For each target, calculate min, mean, max across all model types
        summary_row = {'experiment': f'Experiment {i}'}
        for target, rmses in target_rmse.items():
            summary_row[f'{target}_min'] = min(rmses)
            summary_row[f'{target}_mean'] = sum(rmses) / len(rmses)
            summary_row[f'{target}_max'] = max(rmses)
        
        # Append the row to summary
        summary_rows.append(summary_row)
    
    # Convert the summary into a pandas DataFrame
    df_summary = pd.DataFrame(summary_rows)

    return df_summary

In [None]:
pd.set_option('display.max_rows', None)  # Display all rows
pd.set_option('display.max_columns', None)  # Display all columns

In [None]:
def extract_error_stats(result_dicts):
    rows = []
    for i, result_model_performance in enumerate(result_dicts, 1):
        for result in result_model_performance:
            model_type = result['type']
            target = result['target']
            val_rmse = result['val_RMSE']
            
            # Check if the model type and target combination is already in the list
            row = next((r for r in rows if r['type'] == model_type and r['target'] == target), None)
            
            if row:
                # Update the list of val_RMSE for the specific model type and target
                row['val_RMSEs'].append(val_rmse)
            else:
                # Append new entry with the val_RMSE initialized in a list
                rows.append({
                    'type': model_type,
                    'target': target,
                    'val_RMSEs': [val_rmse]
                })
    
    ## Calculate min, mean, max for each entry and prepare the final table
    summary_rows = []
    for row in rows:
        diff_rmse = row['val_RMSEs'][1]-row['val_RMSEs'][0]
        
        summary_rows.append({
            'type': row['type'],
            'target': row['target'],
            'scaling_off': row['val_RMSEs'][0],
            'scaling_on': row['val_RMSEs'][1],
            'diff_rmse': diff_rmse
        })
    
    # Convert to pandas DataFrame for better presentation
    df = pd.DataFrame(summary_rows)

    return df

In [None]:
def extract_rows(result_dicts):
    rows = []
    for i, result_model_performance in enumerate(result_dicts, 1):
        for result in result_model_performance:
            model_type = result['type']
            target = result['target']
            val_rmse = result['val_RMSE']
            
            # Check if the model type and target combination is already in the list
            row = next((r for r in rows if r['type'] == model_type and r['target'] == target and r['experiment'] ==  f'Experiment {i}'), None)
            
            if row:
                # Update the list of val_RMSE for the specific model type and target
                row['val_RMSEs'].append(val_rmse)
            else:
                # Append new entry with the val_RMSE initialized in a list
                rows.append({
                    'experiment': f'Experiment {i}',
                    'type': model_type,
                    'target': target,
                    'val_RMSEs': [val_rmse]
                })
    
    # Convert to pandas DataFrame for better presentation
    df = pd.DataFrame(rows)

    return df

#### Hyperparameter Optimization

In [None]:
model_trainer_no_hpo = Model_Trainer(
    raw_data_folder_path = '00_RawData/',
    integrated_data_path = '01_IntegratedData/',
    feature_data_path= '02_FeatureData/',
    preprocessed_data_folder_path='03_Preprocessed_FeatureData/',
    frontend_reference_folder_path='04_Frontend_ReferenceData/',
    feature_list=feature_columns,
    model_targets=['OEE', 'PERF', 'AVAIL', 'QUAL', 'DT', 'APT', 'PBT'], # Possible / Tested = ['OEE', 'PERF', 'AVAIL', 'QUAL', 'DT', 'APT', 'PBT']
    models_to_train=['linear', 'ridge', 'poly', 'dt', 'rf', 'svr', 'xgb', 'catboost', 'lgbm', 'NN'], # Possible / Tested = ['linear', 'ridge', 'poly', 'dt', 'rf', 'xgb', 'svr', 'catboost', 'lgbm', 'NN']
    validation_ratio=0.1,
    scaling_enabled=False,
    product_encoding_method='ordinal',
    save_models=False,
    model_optimization_do=False,
    optimization_mode='optunasearch',
    model_test_name='HYPERPARAMETER_OPTIMIZATION'
)

In [None]:
model_trainer_hpo = Model_Trainer(
    raw_data_folder_path = '00_RawData/',
    integrated_data_path = '01_IntegratedData/',
    feature_data_path= '02_FeatureData/',
    preprocessed_data_folder_path='03_Preprocessed_FeatureData/',
    frontend_reference_folder_path='04_Frontend_ReferenceData/',
    feature_list=feature_columns,
    model_targets=['OEE', 'PERF', 'AVAIL', 'QUAL', 'DT', 'APT', 'PBT'], # Possible / Tested = ['OEE', 'PERF', 'AVAIL', 'QUAL', 'DT', 'APT', 'PBT']
    models_to_train=['linear', 'ridge', 'poly', 'dt', 'rf', 'svr', 'xgb', 'catboost', 'lgbm', 'NN'], # Possible / Tested = ['linear', 'ridge', 'poly', 'dt', 'rf', 'xgb', 'svr', 'catboost', 'lgbm', 'NN']
    validation_ratio=0.1,
    scaling_enabled=False,
    product_encoding_method='ordinal',
    save_models=False,
    model_optimization_do=True,
    optimization_mode='optunasearch',
    model_test_name='HYPERPARAMETER_OPTIMIZATION'
)

In [None]:
results_no_hpo = model_trainer_no_hpo.run_training_pipeline(verbose_train=0, verbose_test=2)

In [None]:
results_hpo = model_trainer_hpo.run_training_pipeline(verbose_train=0, verbose_test=2)

In [None]:
import pandas as pd
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score

# Helper function to calculate RMSE, MAE, and R2
def calculate_metrics(y_true, y_pred):
    rmse = mean_squared_error(y_true, y_pred, squared=False)
    mae = mean_absolute_error(y_true, y_pred)
    r2 = r2_score(y_true, y_pred)
    return rmse, mae, r2

# Function to revalidate models and generate the table for multiple targets
def revalidate_models_all_targets(result_dict_no_opt=None, result_dict_opt=None):
    metrics = ['RMSE', 'MAE', 'R2']
    all_results = []

    if result_dict_opt is None:
        # If only one dictionary is provided, calculate metrics just for it
        for model_entry in result_dict_no_opt:
            model_type = model_entry['type']
            target = model_entry['target']  # Assuming the 'target' is a string or label for the model's target
            
            # Calculate metrics for the single dictionary
            y_true = model_entry['y_val']  # True values
            y_pred = model_entry['y_pred']  # Predicted values
            rmse, mae, r2 = calculate_metrics(y_true, y_pred)

            # Collect results for the single dictionary
            all_results.append([target, model_type, 'RMSE', rmse, None, None])
            all_results.append([target, model_type, 'MAE', mae, None, None])
            all_results.append([target, model_type, 'R2', r2, None, None])
    else:
        # Loop through models in both non-optimized and optimized dictionaries
        for model_entry_no_opt, model_entry_opt in zip(result_dict_no_opt, result_dict_opt):
            model_type = model_entry_no_opt['type']
            target = model_entry_no_opt['target']  # Assuming 'target' is a label or string for each target in the model
            
            # Calculate metrics for non-optimized model
            y_true_no_opt = model_entry_no_opt['y_val']  # True values for non-optimized model
            y_pred_no_opt = model_entry_no_opt['y_pred']  # Predicted values for non-optimized model
            rmse_no_opt, mae_no_opt, r2_no_opt = calculate_metrics(y_true_no_opt, y_pred_no_opt)

            # Calculate metrics for optimized model
            y_true_opt = model_entry_opt['y_val']  # True values for optimized model
            y_pred_opt = model_entry_opt['y_pred']  # Predicted values for optimized model
            rmse_opt, mae_opt, r2_opt = calculate_metrics(y_true_opt, y_pred_opt)

            # Calculate differences between optimized and non-optimized models
            rmse_diff = rmse_opt - rmse_no_opt
            mae_diff = mae_opt - mae_no_opt
            r2_diff = r2_opt - r2_no_opt

            # Collect results for non-optimized, optimized, and the difference
            all_results.append([target, model_type, 'RMSE', rmse_no_opt, rmse_opt, rmse_diff])
            all_results.append([target, model_type, 'MAE', mae_no_opt, mae_opt, mae_diff])
            all_results.append([target, model_type, 'R2', r2_no_opt, r2_opt, r2_diff])

    # Create DataFrame to present the results
    df_results = pd.DataFrame(all_results, columns=['Target', 'Model Type', 'Metric', 'Non-Optimized', 'Optimized', 'Difference'])
    
    # Pivot table to present it in the required format, with "Target" as the first column
    df_pivot = df_results.pivot_table(index=['Target', 'Model Type', 'Metric'], values=['Non-Optimized', 'Optimized', 'Difference'], aggfunc='first')
    
    return df_pivot

In [None]:
# Example usage:
# Assuming result_dict_no_opt and result_dict_opt are your two dictionaries
df_pivot = revalidate_models_all_targets(results_no_hpo, results_hpo)
df_pivot

### Composite Analysis

In [None]:
import pandas as pd
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score

# Helper function to calculate RMSE, MAE, and R2
def calculate_metrics(y_true, y_pred):
    rmse = mean_squared_error(y_true, y_pred, squared=False)
    mae = mean_absolute_error(y_true, y_pred)
    r2 = r2_score(y_true, y_pred)
    return rmse, mae, r2

# Function to calculate composite OEE metrics
def calculate_composite_metrics(result_dict_no_opt=None, result_dict_opt=None):
    metrics = ['RMSE', 'MAE', 'R2']
    all_results = []

    if result_dict_opt is None:
        # Only non-optimized models provided
        # Build a mapping from 'type' to a dict of 'target' to model entries
        type_to_target_to_entry = {}
        for model_entry in result_dict_no_opt:
            model_type = model_entry['type']
            target = model_entry['target']
            if model_type not in type_to_target_to_entry:
                type_to_target_to_entry[model_type] = {}
            type_to_target_to_entry[model_type][target] = model_entry

        # Now, for each model type, compute OEE_composite
        for model_type, target_to_entry in type_to_target_to_entry.items():
            # Check if we have 'PERF', 'AVAIL', 'QUAL', and 'OEE' in targets
            required_targets = ['PERF', 'AVAIL', 'QUAL', 'OEE']
            if all(t in target_to_entry for t in required_targets):
                # Get the entries
                perf_entry = target_to_entry['PERF']
                avail_entry = target_to_entry['AVAIL']
                qual_entry = target_to_entry['QUAL']
                oee_entry = target_to_entry['OEE']

                # Get the predictions and actual values
                y_pred_perf = perf_entry['y_pred']
                y_pred_avail = avail_entry['y_pred']
                y_pred_qual = qual_entry['y_pred']
                y_val_oee = oee_entry['y_val'].reset_index(drop=True)  # Actual OEE values

                # Ensure that the predictions are aligned
                # Reset indices to ensure alignment
                y_pred_perf = pd.Series(y_pred_perf).reset_index(drop=True)
                y_pred_avail = pd.Series(y_pred_avail).reset_index(drop=True)
                y_pred_qual = pd.Series(y_pred_qual).reset_index(drop=True)

                # Compute OEE_composite
                oee_composite = y_pred_perf * y_pred_avail * y_pred_qual

                # Compute metrics between oee_composite and y_val_oee
                rmse, mae, r2 = calculate_metrics(y_val_oee, oee_composite)

                # Store results
                all_results.append(['OEE_composite', model_type, 'RMSE', rmse, None, None])
                all_results.append(['OEE_composite', model_type, 'MAE', mae, None, None])
                all_results.append(['OEE_composite', model_type, 'R2', r2, None, None])

            else:
                # Missing one or more required targets, skip this model type
                print(f"Model type '{model_type}' does not have all required targets. Skipping.")
    else:
        # Both optimized and non-optimized models provided
        # Build mappings for both
        type_to_target_to_entry_no_opt = {}
        for model_entry in result_dict_no_opt:
            model_type = model_entry['type']
            target = model_entry['target']
            if model_type not in type_to_target_to_entry_no_opt:
                type_to_target_to_entry_no_opt[model_type] = {}
            type_to_entry = type_to_target_to_entry_no_opt[model_type]
            type_to_entry[target] = model_entry

        type_to_target_to_entry_opt = {}
        for model_entry in result_dict_opt:
            model_type = model_entry['type']
            target = model_entry['target']
            if model_type not in type_to_target_to_entry_opt:
                type_to_target_to_entry_opt[model_type] = {}
            type_to_entry = type_to_target_to_entry_opt[model_type]
            type_to_entry[target] = model_entry

        # For each model type, compute OEE_composite for both non-optimized and optimized models
        for model_type in type_to_target_to_entry_no_opt.keys():
            target_to_entry_no_opt = type_to_target_to_entry_no_opt[model_type]
            target_to_entry_opt = type_to_target_to_entry_opt.get(model_type, {})

            # Check if we have all required targets in both
            required_targets = ['PERF', 'AVAIL', 'QUAL', 'OEE']
            if all(t in target_to_entry_no_opt for t in required_targets) and all(t in target_to_entry_opt for t in required_targets):
                # Get the entries for non-optimized models
                perf_entry_no_opt = target_to_entry_no_opt['PERF']
                avail_entry_no_opt = target_to_entry_no_opt['AVAIL']
                qual_entry_no_opt = target_to_entry_no_opt['QUAL']
                oee_entry_no_opt = target_to_entry_no_opt['OEE']

                # Get the entries for optimized models
                perf_entry_opt = target_to_entry_opt['PERF']
                avail_entry_opt = target_to_entry_opt['AVAIL']
                qual_entry_opt = target_to_entry_opt['QUAL']
                oee_entry_opt = target_to_entry_opt['OEE']

                # Get the predictions and actual values for non-optimized models
                y_pred_perf_no_opt = pd.Series(perf_entry_no_opt['y_pred']).reset_index(drop=True)
                y_pred_avail_no_opt = pd.Series(avail_entry_no_opt['y_pred']).reset_index(drop=True)
                y_pred_qual_no_opt = pd.Series(qual_entry_no_opt['y_pred']).reset_index(drop=True)
                y_val_oee_no_opt = oee_entry_no_opt['y_val'].reset_index(drop=True)

                # Get the predictions and actual values for optimized models
                y_pred_perf_opt = pd.Series(perf_entry_opt['y_pred']).reset_index(drop=True)
                y_pred_avail_opt = pd.Series(avail_entry_opt['y_pred']).reset_index(drop=True)
                y_pred_qual_opt = pd.Series(qual_entry_opt['y_pred']).reset_index(drop=True)
                y_val_oee_opt = oee_entry_opt['y_val'].reset_index(drop=True)

                # Compute OEE_composite for non-optimized models
                oee_composite_no_opt = y_pred_perf_no_opt * y_pred_avail_no_opt * y_pred_qual_no_opt

                # Compute OEE_composite for optimized models
                oee_composite_opt = y_pred_perf_opt * y_pred_avail_opt * y_pred_qual_opt

                # Compute metrics for non-optimized models
                rmse_no_opt, mae_no_opt, r2_no_opt = calculate_metrics(y_val_oee_no_opt, oee_composite_no_opt)

                # Compute metrics for optimized models
                rmse_opt, mae_opt, r2_opt = calculate_metrics(y_val_oee_opt, oee_composite_opt)

                # Calculate differences
                rmse_diff = rmse_opt - rmse_no_opt
                mae_diff = mae_opt - mae_no_opt
                r2_diff = r2_opt - r2_no_opt

                # Store results
                all_results.append(['OEE_composite', model_type, 'RMSE', rmse_no_opt, rmse_opt, rmse_diff])
                all_results.append(['OEE_composite', model_type, 'MAE', mae_no_opt, mae_opt, mae_diff])
                all_results.append(['OEE_composite', model_type, 'R2', r2_no_opt, r2_opt, r2_diff])

            else:
                # Missing targets
                print(f"Model type '{model_type}' does not have all required targets in both optimized and non-optimized models. Skipping.")
    # Create DataFrame
    df_results = pd.DataFrame(all_results, columns=['Target', 'Model Type', 'Metric', 'Non-Optimized', 'Optimized', 'Difference'])
    df_pivot = df_results.pivot_table(index=['Target', 'Model Type', 'Metric'], values=['Non-Optimized', 'Optimized', 'Difference'], aggfunc='first')

    return df_pivot

In [None]:
df_pivot = revalidate_models_all_targets(results_no_hpo, results_hpo)
df_pivot = pd.concat([df_pivot, calculate_composite_metrics(results_no_hpo, results_hpo)])
df_pivot

### Correlation Matrix

In [None]:
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib
import matplotlib.pyplot as plt
from sklearn.metrics import mean_squared_error

# Function to calculate errors for each sample
def calculate_sample_errors(result_model_performance):
    errors = []

    for entry in result_model_performance:
        y_val = entry['y_val']
        y_pred = entry['y_pred']
        sample_errors = (np.array(y_val) - np.array(y_pred)).flatten()  # Flatten the array
        errors.append({
            'type': entry['type'],
            'target': entry['target'],
            'sample_errors': sample_errors
        })
    
    return errors

# Function to create a DataFrame of errors for a specific target
def create_error_df(errors, target, models):
    error_dict = {}

    for entry in errors:
        if entry['target'] == target and entry['type'] in models:
            error_dict[entry['type']] = entry['sample_errors']
    
    # Filter out entries with empty lists
    error_dict = {k: v for k, v in error_dict.items() if len(v) > 0}

    # Check for consistent lengths
    lengths = [len(v) for v in error_dict.values()]
    if len(set(lengths)) != 1:
        raise ValueError("Inconsistent sample sizes across models for the target: {}".format(target))
    
    error_df = pd.DataFrame(error_dict)
    return error_df

# Function to compute and plot correlation of sample errors
def analyze_sample_errors(errors, targets, models):
    for target in targets:
        try:
            error_df = create_error_df(errors, target, models)
            if not error_df.empty:
                # Calculate and plot the correlation matrix
                corr_matrix = error_df.corr()
               # Adjust font size of the annotations
                plt.figure(figsize=(12, 10), dpi=300)
                sns.heatmap(corr_matrix, annot=True, cmap='coolwarm', vmin=-1, vmax=1, fmt='.3f',
                            xticklabels=corr_matrix.columns, yticklabels=corr_matrix.columns,
                            annot_kws={"size": 12})  # Adjust the font size here

                #plt.title(f'Sample Error Correlation Matrix for {target}', fontsize=16)
                plt.show()

        except ValueError as e:
            print(f"Skipping target {target}: {e}")

# Define targets and models
targets = ['OEE'] # Possible / Tested = ['OEE', 'PERF', 'AVAIL', 'QUAL', 'DT', 'APT', 'PBT']
models = ['linear', 'ridge', 'poly', 'dt', 'rf', 'svr', 'xgb', 'catboost', 'lgbm', 'NN'] # Possible / Tested = ['linear', 'ridge', 'poly', 'dt', 'rf', 'xgb', 'svr', 'catboost', 'lgbm', 'NN']

# Calculate sample errors
sample_errors = calculate_sample_errors(results_no_hpo)

# Analyze and visualize sample errors
analyze_sample_errors(sample_errors, targets, models)

### Explainability Charts

In [None]:
import shap
from catboost import Pool
from IPython.display import display, HTML

def explain_model_prediction(model_key, model, X_train, X_test_instance):
    # Convert X_train and X_test_instance to catboost.Pool
    train_pool = Pool(X_train)
    test_pool = Pool(X_test_instance)

    # Initialize the explainer based on the model type
    if model_key == 'catboost':
        # Pass the training data as Pool to calculate SHAP values
        shap_values = model.get_feature_importance(data=train_pool, type='ShapValues')
        shap_values = shap_values[:len(X_test_instance)]
        shap_values = shap_values[:, :-1]  # Remove the last column (base value)
    elif model_key == 'svr':
        print("SHAP explanations for SVR or other non-tree models need a different approach.")
    else:
        explainer = shap.TreeExplainer(model)
        shap_values = explainer.shap_values(X_test_instance)

    # Plot the SHAP values for the test instance
    shap.initjs()
    if model_key in ['rf', 'xgb', 'lgbm', 'catboost']:
        shap.summary_plot(shap_values, X_test_instance, feature_names=X_test_instance.columns, plot_type="bar")
        shap.summary_plot(shap_values, X_test_instance, feature_names=X_test_instance.columns)
        shap.dependence_plot('OrderQuantity', shap_values, X_test_instance, interaction_index=None)
        shap.dependence_plot('10th_Percentile_Auftragswechsel', shap_values, X_test_instance, interaction_index=None)
        shap.dependence_plot('FS_Breite', shap_values, X_test_instance, interaction_index=None)
    elif model_key == 'svr':
        print("SHAP explanations for SVR or other non-tree models need a different approach.")

In [None]:
for model in results_no_hpo:
    if model['type'] == 'catboost' and model['target']=='OEE':
        #print(model['X_val'].columns)
        print(f"Starting analysis for model {model['type']} for target {model['target']}")
        explain_model_prediction(model['type'], model['model'], pd.DataFrame(model['X_val']), pd.DataFrame(model['X_val']))

### Model Evaluation

In [None]:
import matplotlib.pyplot as plt
import scienceplots
import pandas as pd

# Use the 'science' style for plots
plt.style.use('science')

In [None]:
# Creating a DataFrame
model_effiency_analysis = pd.DataFrame(results)

# Plotting function
def plot_model_performance(dataframe, error_function):
    # Get unique targets
    targets = dataframe['target'].unique()
    best_models = []

    for target in targets:
        # Filter data for the current target
        target_data = dataframe[dataframe['target'] == target]
        target_data = target_data.sort_values(by='type')

        # Find the minimum error value
        min_error = target_data[error_function].min()
        min_model = target_data[target_data[error_function]==min_error]

        best_models.append(
            {
                "target": target,
                "model": min_model['model'],
                "name": min_model['type'].values[0],
                "error": min_error,
                "error_name": error_function,
                "X_val": min_model['X_val']
            }
        )
        # Generate colors based on the RMSE value
        colors = ['green' if rmse == min_error else 'skyblue' for rmse in target_data[error_function]]

        # Create a bar plot for the current target
        plt.figure(figsize=(10, 6))
        # Update the plt.bar line to use the colors list
        plt.bar(target_data['type'], target_data[error_function], color=colors)
 
        # Adding title and labels
        plt.title(f'Model Performance for Target: {target}')
        plt.xlabel('Model Type')
        plt.ylabel(error_function)
        plt.xticks(rotation=90)  # for vertical labels
 
        # Display the plot
        plt.show()

    return best_models

# Call the function to plot the performance of models
model_eval = plot_model_performance(model_effiency_analysis, 'val_RMSE')
#print("~~~~~~~~~~~~~~~~~~~~~~~\nMEAN AVERAGE ERROR\n~~~~~~~~~~~~~~~~~~~~~~~~~~~~")
#model_eval = plot_model_performance(model_effiency_analysis, 'MAE')