In [None]:
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import (
    confusion_matrix, classification_report,
    accuracy_score, ConfusionMatrixDisplay
)
import mlflow as mlf
from mlflow.sklearn import log_model, save_model
from mlflow.models import infer_signature
import os

In [None]:
class Dataset_Analyzer:
    def __init__(self, dataset_url: str):
        self.CSV_dataset_URL = dataset_url
        self.CSV_dataset = self.get_dataset_in_CSV_from_URL(self.CSV_dataset_URL)
        self.dataset_column_names = self.get_dataset_columns_names()
        self.dataset_columns_types = dict(self.CSV_dataset.dtypes)
        self.dataset_columns_with_missing_values = []
        self.missing_values_columns_with_int_float_dtypes = []
        
        self.target_y_columns = ['Survived']
        self.excluded_features_X = ['SibSp', 'Parch']
        self.features_set_X = pd.DataFrame()
        self.target_set_y = pd.DataFrame()
        self.train_set_X = pd.DataFrame()
        self.train_set_y = np.array
        self.test_set_X = pd.DataFrame()
        self.test_set_y = np.array
        self.target_y_classes_names = ['Not Survived', 'Survived']
        self.datasets_for_mlflow = {'train': {}, 'test':{}}
    
        self.ML_classifiers_model = {'random_forest': RandomForestClassifier} # in the future - maybe I'll add new models
        self.selected_model_name_for_experiment = 'random_forest'
        self.model_for_experiment = object
        self.prediction_of_target_y = []
        self.confusion_matrix = []
        self.confusion_matrix_figure = object
        self.classification_report = {}
        self.accuracy = 0.0
        self.weighted_average_precision = 0.0
        self.weighted_average_recall = 0.0
        self.weighted_average_f1_score = 0.0
    
    def main(self):
        self.make_data_preparation_for_training()
        print('Training ML model.')
        self.init_classifier_model_for_experiment(self.selected_model_name_for_experiment)
        self.train_classifier_model()
        print('Make prediction of target y values from test set X')
        self.make_prediction_on_test_dataset()
        print('Model Evaluation:')
        self.evaluate_quality_of_ML_model()
        self.define_confusion_matrix_figure()
    
    def make_data_preparation_for_training(self):
        self.show_dataset()
        self.remove_all_string_type_columns_from_dataset()
        self.update_dataset_columns_names()
        self.update_dataset_columns_types()
        self.define_columns_with_missing_values_from_dataset()
        self.define_int_float_dtype_missing_value_columns()
        self.replace_columns_NaN_values_with_mean_values()
        self.define_features_set_X()
        self.define_target_set_y()
        self.features_max_abs_normalization()
        print('Features set X after max abs normalization:')
        self.show_features_set_X()
        print('=' * 40)
        if self.excluded_features_X != []:
            self.features_set_X.drop(
                self.excluded_features_X, 
                axis = 'columns', inplace = True
            )
        print('=' * 40)
        print('Splitting dataset for train and test stages.')
        self.split_dataset_into_train_test_parts()
        print('=' * 40)
        self.train_set_X.index = [i for i in range(self.train_set_X.shape[0])]
        self.test_set_X.index = [i for i in range(self.test_set_X.shape[0])]
        self.define_mlflow_pandas_datasets()
    
    def show_dataset(self):
        print(self.CSV_dataset)
    
    def get_dataset_columns_names(self):
        return self.CSV_dataset.columns
    
    def update_dataset_columns_names(self):
        self.dataset_column_names = self.get_dataset_columns_names()
    
    def update_dataset_columns_types(self):
        self.dataset_columns_types = dict(self.CSV_dataset.dtypes)
    
    def show_dataset_columns_types(self):
        print(self.dataset_columns_types)
    
    def show_dataset_columns(self):
        print(self.dataset_column_names)
    
    def get_dataset_in_CSV_from_URL(self, url: str):
        return pd.read_csv(url)
    
    def remove_all_string_type_columns_from_dataset(self):
        for dataset_column_name in self.dataset_columns_types:
            if self.dataset_columns_types[dataset_column_name] == object:
                self.CSV_dataset = self.CSV_dataset.drop(dataset_column_name, axis = 1)
        return True
    
    def define_columns_with_missing_values_from_dataset(self):
        missing_values_by_bool_mapping_of_dataset = self.CSV_dataset.isna()
    
        for dataset_column in self.dataset_column_names:
            column_values = missing_values_by_bool_mapping_of_dataset[dataset_column].values
            if True in column_values:
                self.dataset_columns_with_missing_values.append(dataset_column)
                print(f'Missing values column - {dataset_column}!')
        return True
    
    def define_int_float_dtype_missing_value_columns(self):
        for missing_value_column in self.dataset_columns_with_missing_values:
            if self.dataset_columns_types[missing_value_column] != object:
                self.missing_values_columns_with_int_float_dtypes.append(missing_value_column)
        return True
    
    def replace_columns_NaN_values_with_mean_values(self):
        for int_float_column in self.missing_values_columns_with_int_float_dtypes:
            column_mean_value = self.CSV_dataset[int_float_column].mean(skipna = True)
            self.CSV_dataset[int_float_column] = self.CSV_dataset[int_float_column].replace(
                to_replace = np.nan, value = column_mean_value
            )
        return True
    
    def define_features_set_X(self):
        for target_y_column in self.target_y_columns:
            self.features_set_X = self.CSV_dataset.drop(target_y_column, axis = 1)
        return True
    
    def show_features_set_X(self):
        print(self.features_set_X)
    
    def define_target_set_y(self):
        self.target_set_y = self.CSV_dataset[self.target_y_columns]
        return True
    
    def split_dataset_into_train_test_parts(self):
        self.train_set_X, self.test_set_X, \
        self.train_set_y, self.test_set_y = train_test_split(
            self.features_set_X, self.target_set_y, test_size = 0.2, random_state = 42
            )
        self.train_set_y = self.train_set_y.values.reshape(-1)
        self.test_set_y = self.test_set_y.values.reshape(-1)
        return True
    
    def features_max_abs_normalization(self):
        features_column_names = self.features_set_X.columns
        for features_column in features_column_names:
            max_abs_column_value = self.features_set_X[features_column].abs().max()
            self.features_set_X[features_column] = self.features_set_X[features_column] / max_abs_column_value
        return True
    
    def init_classifier_model_for_experiment(self, model_name: str):
        self.model_for_experiment = self.ML_classifiers_model[model_name]()
    
    def train_classifier_model(self):
        self.model_for_experiment.fit(self.train_set_X, self.train_set_y)
        
    def make_prediction_on_test_dataset(self):
        self.prediction_of_target_y = self.model_for_experiment.predict(self.test_set_X)
    
    def evaluate_quality_of_ML_model(self):
        self.define_confusion_matrix()
        self.define_classification_report()
        self.define_model_accuracy()
        self.define_model_weighted_average_precision()
        self.define_model_weighted_average_recall()
        self.define_model_weighted_average_f1_score()
    
    def define_confusion_matrix(self):
        self.confusion_matrix = confusion_matrix(self.test_set_y, self.prediction_of_target_y)

    def define_model_accuracy(self):
        self.accuracy = round(
            accuracy_score(self.test_set_y, self.prediction_of_target_y),
        3)

    def define_model_weighted_average_precision(self):
        self.weighted_average_precision = round(
            self.classification_report["weighted avg"]["precision"],
        3)

    def define_model_weighted_average_recall(self):
        self.weighted_average_recall = round(
            self.classification_report["weighted avg"]["recall"], 
        3)

    def define_model_weighted_average_f1_score(self):
        self.weighted_average_f1_score = round(
            self.classification_report["weighted avg"]["f1-score"], 
        3)

    def define_confusion_matrix_figure(self):
        confusion_matrix_display_obj = ConfusionMatrixDisplay(
            confusion_matrix = self.confusion_matrix,
            display_labels = self.model_for_experiment.classes_
        )
        self.confusion_matrix_figure = confusion_matrix_display_obj.plot().figure_

    def define_classification_report(self):
        self.classification_report = classification_report(
            self.test_set_y, self.prediction_of_target_y,
            target_names = self.target_y_classes_names,
            digits = 3,
            output_dict = True
        )

    def define_mlflow_pandas_datasets(self):
        train_df_y = self.get_labels_y_as_dataframe(self.train_set_y)
        train_df_X_y = pd.concat(
            [self.train_set_X, train_df_y], axis = 1
        )
        test_df_y = self.get_labels_y_as_dataframe(self.test_set_y)
        test_df_X_y = pd.concat(
            [self.test_set_X, test_df_y], axis = 1
        )
        self.datasets_for_mlflow["train"] = mlf.data.from_pandas(
            train_df_X_y, targets = self.target_y_columns[0]
        )
        self.datasets_for_mlflow["test"] = mlf.data.from_pandas(
            test_df_X_y, targets = self.target_y_columns[0]
        )

    def get_labels_y_as_dataframe(self, labels: np.array):
        return pd.DataFrame(labels, columns = self.target_y_columns)
        

In [None]:
# This notebook only for testing as API for main classifier program.
class MLFlow_Experiment_Runner:
    def __init__(self, classifier_model_name: str):
        self.classifier_model_name = classifier_model_name
        self.experiment_name = self.get_experiment_name()
        self.active_experiment = object
        self.current_run_name = ''
        self.max_experiment_runs = 10
        self.tracking_server_IP = os.getenv('MLFLOW_IP')
        self.tracking_server_port = 5000
        self.tracking_server_URI = f"http://{self.tracking_server_IP}:{self.tracking_server_port}"

    def main(self, target_classifier_object):
        self.set_mlflow_server_URI()
        if self.experiment_exist():
            print(f'[INFO] Experiment - {self.experiment_name} - active!')
            self.set_experiment_as_active()
        else:
            print(f'[INFO] Experiment - {self.experiment_name} does not exist!')
            self.create_new_experiment()
        self.delete_old_experiments_runs()
        
        self.current_run_name = self.get_current_run_name()
        self.start_experiment_run(target_classifier_object)
        # TODO: think, how to get all logged models and
        # how to delete some by criteria

    def start_experiment_run(self, classifier_object):
        with mlf.start_run(run_name = self.current_run_name):
            print("[INFO] Start classifier experiment...")
            mlf.sklearn.autolog(disable = True)
            classifier_object.main()
            mlf.log_metric('accuracy', classifier_object.accuracy)
            mlf.log_metric('precision', classifier_object.weighted_average_precision)
            mlf.log_metric('recall', classifier_object.weighted_average_recall)
            mlf.log_metric('f1-score', classifier_object.weighted_average_f1_score)
            mlf.log_figure(
                classifier_object.confusion_matrix_figure,
                artifact_file = 'confusion_matrix.png'
            )
            mlf.log_dict(
                dictionary = classifier_object.classification_report,
                artifact_file = 'classification_report.json'
            )
            model_signature = self.get_model_signature(
                classifier_object.train_set_X,
                classifier_object.test_set_y
            )
            log_model(
                sk_model = classifier_object.model_for_experiment,
                name = self.classifier_model_name,
                signature = model_signature
            )
            mlf.log_input(
                classifier_object.datasets_for_mlflow["train"],
                context = "training"
            )
            mlf.log_input(
                classifier_object.datasets_for_mlflow["test"],
                context = "testing"
            )
            save_model(
                sk_model = classifier_object.model_for_experiment,
                path = self.get_dirname_for_model_saving(classifier_object)
            )
    
    def delete_old_experiments_runs(self):
        runs_name_endtime_pairs = self.get_experiment_runs_name_endtime_pairs()
        runs_name_endtime_pairs.sort(key = lambda run: run['end_time'])
        if self.experiment_exceed_max_runs(runs_name_endtime_pairs):
            print(f"[INFO] Experiment has more than {self.max_experiment_runs} runs. Deleting old runs...")
            self.delete_old_time_experiment_runs(runs_name_endtime_pairs)
            return True
        else:
            return False
    
    def set_mlflow_server_URI(self):
        print('[INFO] Setting Tracking Server URI...')
        mlf.set_tracking_uri(self.tracking_server_URI)
    
    def get_experiment_name(self):
        return f"{self.classifier_model_name}_classifier"
    
    def experiment_exist(self):
        if mlf.get_experiment_by_name(self.experiment_name):
            print(f'[INFO] Experiment {self.experiment_name} already exist!')
            return True
        return False
    
    def create_new_experiment(self):
        print(f'[INFO] Creating experiment - {self.experiment_name}...')
        mlf.create_experiment(self.experiment_name)
    
    def set_experiment_as_active(self):
        return mlf.set_experiment(self.experiment_name)

    def get_current_run_name(self):
        runs = self.get_experiment_runs_name_endtime_pairs()
        current_run_number = len(runs) + 1
        return f"train_test_{current_run_number}"

    def delete_old_time_experiment_runs(self, runs: list):
        target_runs_for_deleting = self.get_target_runs_for_deleting(runs)
        for run in target_runs_for_deleting:
            run_ID = run['run_ID']
            mlf.delete_run(run_ID)
            print(f"[INFO] Deleted Run with ID: {run_ID}")
        return True

    def get_target_runs_for_deleting(self, sorted_runs_by_time: list):
        return sorted_runs_by_time[:self.max_experiment_runs]
    
    def experiment_exceed_max_runs(self, runs: list):
        runs_amount = len(runs)
        if runs_amount <= self.max_experiment_runs:
            return False
        else:
            return True
            
    def get_experiment_runs_name_endtime_pairs(self) -> list[dict]:
        runs = mlf.search_runs(
            experiment_names = [self.experiment_name],
            output_format = 'list'
        )
        runs_name_endtime_pairs = []
        for run in runs:
            pair = {'run_ID': run.info.run_id, 'end_time': run.info.end_time}
            runs_name_endtime_pairs.append(pair)
        return runs_name_endtime_pairs

    def get_model_signature(self, input_dataset: pd.DataFrame, prediction):
        return infer_signature(
            model_input = input_dataset,
            model_output = prediction
        )

    def get_dirname_for_model_saving(self, classifier_obj):
        model_dirname = "trained_models/"
        model_dirname += f"{classifier_obj.selected_model_name_for_experiment}"
        model_dirname += f"_{self.current_run_name}"
        return model_dirname
        
dataset_analyzer = Dataset_Analyzer("https://raw.githubusercontent.com/datasciencedojo/datasets/master/titanic.csv")
mlflow_experiment = MLFlow_Experiment_Runner('random_forest')
mlflow_experiment.main(dataset_analyzer)