In [41]:
import kfp
import kfp.components as comp
import requests
import kfp.dsl as dsl

In [42]:
!pip show kfp

DEPRECATION: Loading egg at c:\users\user\appdata\local\programs\python\python312\lib\site-packages\pyyaml-5.4.1-py3.12-win-amd64.egg is deprecated. pip 24.3 will enforce this behaviour change. A possible replacement is to use pip for package installation.. Discussion can be found at https://github.com/pypa/pip/issues/12330

Name: kfp





Version: 1.8.18
Summary: KubeFlow Pipelines SDK
Home-page: https://github.com/kubeflow/pipelines
Author: The Kubeflow Authors


In [43]:
def prepare_data():
    import pandas as pd
    import seaborn as sns
    import matplotlib.pyplot as plt
    from sklearn.utils import resample
    from sklearn.linear_model import Lasso
    from sklearn.feature_selection import SelectFromModel
    from sklearn.ensemble import RandomForestClassifier
    
    print("---- Inside prepare_data component ----")
    
    # Chargement des données
    num_rows_to_read = 100000
    data = pd.read_csv("https://raw.githubusercontent.com/Xephreen/Churn_data/main/Master_table.csv?token=GHSAT0AAAAAACRE22CC6BYWQNEKXREY53XKZUCZ6DQ", nrows=num_rows_to_read)
    
    # Trier le DataFrame par la colonne "Idclient"
    df = data.sort_values(by="Idclient")
    
    # Afficher un aperçu des premières lignes du DataFrame
    print("Aperçu des premières lignes du DataFrame :")
    print(df.head())
    
    # Afficher les informations sur les colonnes et les types de données
    print("Informations sur les colonnes et les types de données :")
    print(df.info())
    
    # Afficher un résumé statistique des variables numériques
    print("Résumé statistique des variables numériques :")
    print(df.describe())
    
    # Afficher les valeurs uniques dans chaque colonne
    print("Valeurs uniques dans chaque colonne :")
    for col in df.columns:
        print(f"{col}: {df[col].unique()}")
    
    # Afficher le nombre de valeurs manquantes dans chaque colonne
    print("Nombre de valeurs manquantes dans chaque colonne :")
    print(df.isnull().sum())
    
    # Afficher la distribution de la variable cible
    print("Distribution de la variable cible :")
    print(df['Churn_next_trim'].value_counts())

    # Exclure les colonnes non numériques de la sélection des variables
    numeric_cols = [col for col in df.select_dtypes(include=['int64', 'float64']).columns if col != 'Idclient' and col != 'Churn_next_trim' and col !='ACTIVE_RATIO']

    # Gestion des valeurs aberrantes
    for col in numeric_cols:
        # Calculer le premier et le troisième quartile (Q1 et Q3)
        Q1 = df[col].quantile(0.25)
        Q3 = df[col].quantile(0.75)
        # Calculer l'écart interquartile (IQR)
        IQR = Q3 - Q1
        # Définir les limites inférieure et supérieure pour détecter les valeurs aberrantes
        lower_bound = Q1 - 1.5 * IQR
        upper_bound = Q3 + 1.5 * IQR
        # Remplacer les valeurs aberrantes par la médiane
        df[col] = df[col].apply(lambda x: df[col].median() if x < lower_bound or x > upper_bound else x)
    
    # Séparation des classes pour rééchantillonnage
    df_class_0 = df[df['Churn_next_trim'] == 0]
    df_class_1 = df[df['Churn_next_trim'] == 1]
    n_class_0 = len(df_class_0)
    n_class_1 = len(df_class_1)
    n_class_1_resampled = int(0.3 * n_class_0 / 0.7)
    df_class_1_resampled = resample(df_class_1, replace=True, n_samples=n_class_1_resampled, random_state=42)
    df_resampled = pd.concat([df_class_0, df_class_1_resampled])
    
    
    # Affichage des distributions avant et après rééchantillonnage
    plt.figure(figsize=(10, 5))
    plt.subplot(1, 2, 1)
    df['Churn_next_trim'].value_counts().plot(kind='bar', color=['blue', 'orange'])
    plt.title('Distribution des classes avant rééchantillonnage')
    plt.xlabel('Classe')
    plt.ylabel("Nombre d'observations")

    plt.subplot(1, 2, 2)
    df_resampled['Churn_next_trim'].value_counts().plot(kind='bar', color=['blue', 'orange'])
    plt.title('Distribution des classes après rééchantillonnage')
    plt.xlabel('Classe')
    plt.ylabel("Nombre d'observations")

    plt.tight_layout()
    plt.show()
    
    # Visualiser la distribution des variables numériques avec des histogrammes
    df[numeric_cols].hist(figsize=(15, 10))
    plt.show()
    
    # Visualiser la distribution des variables catégorielles avec des diagrammes en barres
    cat_cols = df.select_dtypes(include=['object']).columns
    palette = sns.color_palette("tab10", 7)
    for i, col in enumerate(cat_cols):
        counts = df[col].value_counts()
        plt.figure(figsize=(10, 6))
        bars = plt.bar(counts.index, counts.values, color=palette)
        plt.title(f"Diagramme en barres pour {col}", fontsize=14)
        plt.xlabel(col, fontsize=12)
        plt.ylabel("Fréquence", fontsize=12)
        plt.legend(bars, counts.index, fontsize=10)
        plt.show()
    
    # Tracer les distributions univariées de chaque variable catégorielle
    for column in cat_cols:
        sns.displot(df[column], kde=True)
        plt.title(f"Distribution de {column}")
        plt.show()
    
    # Calculer et visualiser la matrice de corrélation
    correlation_matrix = df.select_dtypes(include=['int64', 'float64']).corr()
    plt.figure(figsize=(12, 8))
    sns.heatmap(correlation_matrix, annot=True, cmap='coolwarm', fmt=".2f")
    plt.title("Matrice de corrélation entre les variables numériques", fontsize=16)
    plt.show()
    
    # Sélection des caractéristiques importantes
    X = df[numeric_cols]
    y = df['Churn_next_trim']
    lasso = Lasso(alpha=0.01)
    selector_lasso = SelectFromModel(lasso)
    selector_lasso.fit(X, y)
    selected_features_lasso = X.columns[selector_lasso.get_support()]
    
    rf = RandomForestClassifier(n_estimators=100, random_state=42)
    rf.fit(X, y)
    importances = rf.feature_importances_
    selector_rf = SelectFromModel(rf, threshold=0.05)
    selector_rf.fit(X, y)
    selected_features_rf = X.columns[selector_rf.get_support()]
    
    print("Caractéristiques sélectionnées par Lasso :")
    print(selected_features_lasso)
    print("\nCaractéristiques sélectionnées par RandomForest :")
    print(selected_features_rf)

    # Création de la dataframe avec les colonnes sélectionnées
    df_resampled = df_resampled[['Idclient'] + list(selected_features_rf) + ['Churn_next_trim']]

    # Enregistrement du DataFrame final
    df_resampled = df_resampled.sort_values(by='Idclient')
    df_resampled.to_csv(f'data/final_df.csv', index=False)
    print("\n ---- data csv is saved to PV location /data/final_df.csv ----")
        
    # Retourner le DataFrame final pour une utilisation ultérieure si nécessaire
    return df_resampled

Author-email: 
License: 
Location: C:\Users\user\AppData\Roaming\Python\Python312\site-packages


In [44]:
def train_test_split():
    import pandas as pd
    import numpy as np
    from sklearn.model_selection import train_test_split
    print("---- Inside train_test_split component ----")
    final_data = pd.read_csv(f'data/final_df.csv')
    target_column = 'Churn_next_trim'
    X = final_data.loc[:, (final_data.columns != target_column) & (final_data.columns != 'Idclient')]
    y = final_data.loc[:, final_data.columns == target_column]
    
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3,stratify = y, random_state=47)
    
    np.save(f'data/X_train.npy', X_train)
    np.save(f'data/X_test.npy', X_test)
    np.save(f'data/y_train.npy', y_train)
    np.save(f'data/y_test.npy', y_test)
    
    print("\n---- X_train ----")
    print("\n")
    print(X_train)
    
    print("\n---- X_test ----")
    print("\n")
    print(X_test)
    
    print("\n---- y_train ----")
    print("\n")
    print(y_train)
    
    print("\n---- y_test ----")
    print("\n")
    print(y_test)

Requires: absl-py, click, cloudpickle, Deprecated, docstring-parser, fire, google-api-core, google-api-python-client, google-auth, google-cloud-storage, jsonschema, kfp-pipeline-spec, kfp-server-api, kubernetes, protobuf, pydantic, PyYAML, requests-toolbelt, strip-hints, tabulate, typer, uritemplate
Required-by: 


In [45]:
def training_basic_classifier_with_early_stopping(model_choice):
    import numpy as np
    from sklearn.linear_model import LogisticRegression
    from sklearn.tree import DecisionTreeClassifier
    from sklearn.ensemble import RandomForestClassifier
    from sklearn.svm import SVC
    from sklearn.neighbors import KNeighborsClassifier
    from sklearn.naive_bayes import GaussianNB
    from xgboost import XGBClassifier
    from sklearn.model_selection import train_test_split
    import pickle
    
    print("---- Inside training_basic_classifier_with_early_stopping component ----")
    
    # Charger les données
    X_train = np.load('data/X_train.npy', allow_pickle=True)
    y_train = np.load('data/y_train.npy', allow_pickle=True)
    
    # Diviser les données en ensemble d'entraînement et ensemble de validation
    X_train_split, X_val_split, y_train_split, y_val_split = train_test_split(X_train, y_train, test_size=0.2, random_state=42)
    
    # Sélectionner le modèle
    if model_choice == 'Logistic Regression':
        classifier = LogisticRegression(max_iter=500, solver='liblinear', penalty='l2', C=1.0)
    elif model_choice == 'Decision Trees':
        classifier = DecisionTreeClassifier(max_depth=5)
    elif model_choice == 'Random Forest':
        classifier = RandomForestClassifier(n_estimators=100, max_depth=10)
    elif model_choice == 'Support Vector Machines':
        classifier = SVC(probability=True, C=1.0, kernel='linear')
    elif model_choice == 'k-Nearest Neighbors':
        classifier = KNeighborsClassifier(n_neighbors=5)
    elif model_choice == 'Naive Bayes':
        classifier = GaussianNB()
    elif model_choice == 'XGBoost':
        classifier = XGBClassifier(use_label_encoder=False, eval_metric='logloss', n_estimators=100, max_depth=5)
    else:
        raise ValueError("Invalid model choice. Please select one of the following: 'Logistic Regression', 'Decision Trees', 'Random Forests', 'Support Vector Machines', 'k-Nearest Neighbors', 'Naive Bayes', 'XGBoost'")
    
    # Entraîner le modèle avec détection précoce pour XGBoost
    if model_choice == 'XGBoost':
        eval_set = [(X_train_split, y_train_split), (X_val_split, y_val_split)]
        classifier.fit(X_train_split, y_train_split, eval_set=eval_set, early_stopping_rounds=10, verbose=True)
    else:
        classifier.fit(X_train_split, y_train_split)
    
    # Sauvegarder le modèle entraîné
    with open(f'data/{model_choice}.pkl', 'wb') as f:
        pickle.dump(classifier, f)
    
    print(f"\n{model_choice} classifier is trained and saved to PV location /data/{model_choice}.pkl")

In [46]:
def predict_on_test_data(model_path, X_test_path, y_pred_path):
    import numpy as np
    import pickle
    
    print("---- Inside predict_on_test_data component ----")
    
    # Charger le modèle
    with open(model_path, 'rb') as f:
        model = pickle.load(f)
    
    # Charger les données de test
    X_test = np.load(X_test_path, allow_pickle=True)
    
    # Faire des prédictions sur les données de test
    y_pred = model.predict(X_test)
    
    # Sauvegarder les prédictions
    np.save(y_pred_path, y_pred)
    
    print("\n---- Predicted classes ----")
    print(y_pred)

In [47]:
def get_metrics(y_test_path, y_pred_path):
    import numpy as np
    from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score, confusion_matrix, matthews_corrcoef
    from sklearn import metrics
    
    print("---- Inside get_metrics component ----")
    
    # Charger les vraies étiquettes et les prédictions
    y_test = np.load(y_test_path, allow_pickle=True)
    y_pred = np.load(y_pred_path, allow_pickle=True)
    
    # Calculer les métriques
    acc = accuracy_score(y_test, y_pred)
    prec = precision_score(y_test, y_pred, average='micro')
    recall = recall_score(y_test, y_pred, average='micro')
    f1 = f1_score(y_test, y_pred, average='micro')
    auc = roc_auc_score(y_test, y_pred, average='micro', multi_class='ovr')
    mcc = matthews_corrcoef(y_test, y_pred)
    conf_matrix = confusion_matrix(y_test, y_pred)
    
    print(metrics.classification_report(y_test, y_pred))
    
    print("\nModel Metrics:")
    print(f"Accuracy: {round(acc, 2)}")
    print(f"Precision: {round(prec, 2)}")
    print(f"Recall: {round(recall, 2)}")
    print(f"F1-Score: {round(f1, 2)}")
    print(f"AUC: {round(auc, 2)}")
    print(f"MCC: {round(mcc, 2)}")
    print("Confusion Matrix:\n", conf_matrix)

In [48]:
def hyperparameter_tuning(X_train_path, y_train_path, model_choice):
    import pickle
    import numpy as np
    from sklearn.model_selection import RandomizedSearchCV
    from sklearn.linear_model import LogisticRegression
    from sklearn.ensemble import RandomForestClassifier
    from sklearn.svm import SVC
    from sklearn.neighbors import KNeighborsClassifier
    from sklearn.naive_bayes import GaussianNB
    from sklearn.tree import DecisionTreeClassifier
    from xgboost import XGBClassifier
    from scipy.stats import randint, uniform

    # Charger les données d'entraînement depuis les chemins
    X_train = np.load(X_train_path, allow_pickle=True)
    y_train = np.load(y_train_path, allow_pickle=True)

    # Définir les modèles disponibles
    models = {
        'Logistic Regression': LogisticRegression(),
        'Random Forest': RandomForestClassifier(),
        'Support Vector Machine': SVC(),
        'k-Nearest Neighbors': KNeighborsClassifier(),
        'Naive Bayes': GaussianNB(),
        'Decision Tree': DecisionTreeClassifier(),
        'XGBoost': XGBClassifier()
    }

    # Définir les distributions de recherche pour chaque modèle
    param_distributions = {
    'Logistic Regression': {'C': [1.0]},  # Test with C=1.0
    'Random Forest': {'n_estimators': [100], 'max_depth': [10]},  # Test with n_estimators=100, max_depth=10
    'Support Vector Machine': {'C': [1.0], 'gamma': ['scale']},  # Test with C=1.0 and gamma='scale'
    'k-Nearest Neighbors': {'n_neighbors': [5]},  # Test with n_neighbors=5
    'Naive Bayes': {},  # No hyperparameters to tune for Naive Bayes
    'Decision Tree': {'max_depth': [10]},  # Test with max_depth=10
    'XGBoost': {'n_estimators': [100], 'max_depth': [5]}  # Test with n_estimators=100, max_depth=5
    }


    # Effectuer la recherche des hyperparamètres optimaux pour le modèle choisi
    search = RandomizedSearchCV(models[model_choice], param_distributions[model_choice], n_iter=20, cv=3, scoring='accuracy', random_state=42, n_jobs=-1)
    search.fit(X_train, y_train)

    # Enregistrer le meilleur modèle trouvé
    best_model = search.best_estimator_
    with open(f'data/best_{model_choice}.pkl', 'wb') as f:
        pickle.dump(best_model, f)

    # Afficher les hyperparamètres du meilleur modèle
    best_params = search.best_params_
    print("Best Hyperparameters:")
    for param, value in best_params.items():
        print(f"{param}: {value}")

    return best_model


In [49]:
def get_metrics_best_model(X_test_path, y_test_path, best_model_path):
    import numpy as np
    from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix, roc_auc_score, matthews_corrcoef, classification_report
    import pickle
    
    print("---- Inside get_metrics_best_model component ----")
    
    X_test = np.load(X_test_path, allow_pickle=True)
    y_test = np.load(y_test_path, allow_pickle=True)
    
    # Charger le meilleur modèle
    with open(best_model_path, 'rb') as f:
        best_model = pickle.load(f)
    
    # Faire des prédictions avec le meilleur modèle
    y_pred_best_model = best_model.predict(X_test)
    
    # Calculer les probabilités de la classe positive si disponible
    if hasattr(best_model, "predict_proba"):
        y_pred_prob_best_model = best_model.predict_proba(X_test)[:, 1]  # Probabilité de la classe positive
        auc = roc_auc_score(y_test, y_pred_prob_best_model)
    else:
        auc = None

    # Calculer les métriques avec les prédictions du meilleur modèle
    acc = accuracy_score(y_test, y_pred_best_model)
    prec = precision_score(y_test, y_pred_best_model, average='micro')
    recall = recall_score(y_test, y_pred_best_model, average='micro')
    f1 = f1_score(y_test, y_pred_best_model, average='micro')
    cm = confusion_matrix(y_test, y_pred_best_model)
    mcc = matthews_corrcoef(y_test, y_pred_best_model)
    
    print(classification_report(y_test, y_pred_best_model))
    
    print("\nModel Metrics:")
    print(f"Accuracy: {round(acc, 2)}")
    print(f"Precision: {round(prec, 2)}")
    print(f"Recall: {round(recall, 2)}")
    print(f"F1-Score: {round(f1, 2)}")
    if auc is not None:
        print(f"AUC: {round(auc, 2)}")
    print(f"MCC: {round(mcc, 2)}")
    print("Confusion Matrix:\n", cm)

### Kubeflow pipeline creation work start from here

In [50]:
create_step_prepare_data = kfp.components.create_component_from_func(
    func=prepare_data,
    base_image='python:3.7',
    packages_to_install=['pandas==1.2.4','numpy==1.21.0','seaborn==0.11.2','scikit-learn==0.24.2']
)

In [51]:
create_step_train_test_split = kfp.components.create_component_from_func(
    func=train_test_split,
    base_image='python:3.7',
    packages_to_install=['pandas==1.2.4','numpy==1.21.0','scikit-learn==0.24.2']
)

In [52]:
create_step_training_basic_classifier_with_early_stopping = kfp.components.create_component_from_func(
    func=training_basic_classifier_with_early_stopping,
    base_image='python:3.7',
    packages_to_install=['pandas==1.2.4','numpy==1.21.0','scikit-learn==0.24.2', 'xgboost==1.4.2']
)

In [53]:
create_step_predict_on_test_data = kfp.components.create_component_from_func(
    func=predict_on_test_data,
    base_image='python:3.7',
    packages_to_install=['pandas==1.2.4','numpy==1.21.0','scikit-learn==0.24.2', 'xgboost==1.4.2']
)

In [54]:
create_step_get_metrics = kfp.components.create_component_from_func(
    func=get_metrics,
    base_image='python:3.7',
    packages_to_install=['pandas==1.2.4','numpy==1.21.0','scikit-learn==0.24.2', 'xgboost==1.4.2']
)

In [55]:
create_step_hyperparameter_tuning = kfp.components.create_component_from_func(
    func=hyperparameter_tuning,
    base_image='python:3.7',
    packages_to_install=['pandas==1.2.4','numpy==1.21.0','scikit-learn==0.24.2', 'xgboost==1.4.2']
)

In [56]:
create_step_get_metrics_best_model = kfp.components.create_component_from_func(
    func=get_metrics_best_model,
    base_image='python:3.7',
    packages_to_install=['pandas==1.2.4','numpy==1.21.0','scikit-learn==0.24.2', 'xgboost==1.4.2']
)

In [57]:
# Define the pipeline
@dsl.pipeline(
   name='Local clients churn classification pipeline',
   description='A pipeline that performs classification to predict the churn of customers'
)
# Define parameters to be fed into pipeline
def churn_classifier_pipeline(data_path: str):
    vop = dsl.VolumeOp(
    name="t-vol",
    resource_name="t-vol", 
    size="1Gi", 
    modes=dsl.VOLUME_MODE_RWO)

    model_choice = 'k-Nearest Neighbors'
    model_path = f'/data/{model_choice}.pkl'
    best_model_path = f'data/best_{model_choice}.pkl'
    X_train_path = f'data/X_train.npy'
    X_test_path = f'data/X_test.npy'
    y_train_path = f'data/y_train.npy'
    y_test_path = f'data/y_test.npy'
    y_pred_path = f'data/y_pred.npy'
    
    prepare_data_task = create_step_prepare_data().add_pvolumes({data_path: vop.volume})
    train_test_split = create_step_train_test_split().add_pvolumes({data_path: vop.volume}).after(prepare_data_task)
    training_basic_classifier_with_early_stopping = create_step_training_basic_classifier_with_early_stopping(model_choice).add_pvolumes({data_path: vop.volume}).after(train_test_split)
    log_predicted_class = create_step_predict_on_test_data(model_path, X_test_path,y_pred_path).add_pvolumes({data_path: vop.volume}).after(training_basic_classifier_with_early_stopping)
    log_metrics_task = create_step_get_metrics(y_test_path, y_pred_path).add_pvolumes({data_path: vop.volume}).after(log_predicted_class)
    log_hyperparameter_tuning = create_step_hyperparameter_tuning(X_train_path, y_train_path,model_choice).add_pvolumes({data_path: vop.volume}).after(log_metrics_task)
    log_metrics_best_model_task = create_step_get_metrics_best_model(X_test_path, y_test_path, best_model_path).add_pvolumes({data_path: vop.volume}).after(log_hyperparameter_tuning)
    
    prepare_data_task.execution_options.caching_strategy.max_cache_staleness = "P0D"
    train_test_split.execution_options.caching_strategy.max_cache_staleness = "P0D"
    training_basic_classifier_with_early_stopping.execution_options.caching_strategy.max_cache_staleness = "P0D"
    log_predicted_class.execution_options.caching_strategy.max_cache_staleness = "P0D" 
    log_metrics_task.execution_options.caching_strategy.max_cache_staleness = "P0D"
    log_hyperparameter_tuning.execution_options.caching_strategy.max_cache_staleness = "P0D"
    log_metrics_best_model_task.execution_options.caching_strategy.max_cache_staleness = "P0D"
    

In [58]:
kfp.compiler.Compiler().compile(
    pipeline_func=churn_classifier_pipeline,
    package_path='Churn_Classifier_pipeline.yaml')

In [59]:
client = kfp.Client()
#session_cookie = "MTY2MDY0Mjg0OXxOd3dBTkRSVE5FeElTMEZDVDFVeU5EZE1SMHhUVHpRMU5FcFpNMWRNVWpaTFVrOHlXRFJOVlRReVVFNUxOazFZVEVWQ05FUkZTRUU9fM7IcyOyK49OM0dMDjRJR85gqDksj-YOOLsagNs-_-KR"
# HOST = "http://localhost:8080/"
# namespace = "kubeflow"
# client = kfp.Client(
#     host=f"{HOST}/pipeline",
#     #cookies=f"authservice_session={session_cookie}",
#     namespace=namespace,
# )

In [60]:
DATA_PATH = '/data'

import datetime
print(datetime.datetime.now().date())


pipeline_func = churn_classifier_pipeline
experiment_name = 'churn_classifier_exp' +"_"+ str(datetime.datetime.now().date())
run_name = pipeline_func.__name__ + ' run'
namespace = "kubeflow"

arguments = {"data_path":DATA_PATH}

kfp.compiler.Compiler().compile(pipeline_func,  
  '{}.zip'.format(experiment_name))

run_result = client.create_run_from_pipeline_func(pipeline_func, 
                                                  experiment_name=experiment_name, 
                                                  run_name=run_name, 
                                                  arguments=arguments)

# from kubernetes import client as k8s_client
# pipeline_conf = kfp.dsl.PipelineConf()
# pipeline_conf.set_image_pull_secrets([k8s_client.V1ObjectReference(namespace='kubeflow', 
#                                                                                  name="secret")])
# pipeline_conf.set_image_pull_policy("IfNotPresent")
    
# Compile pipeline to generate compressed YAML definition of the pipeline.
# kfp.compiler.Compiler().compile(pipeline_func,  
#   '{}.zip'.format(experiment_name))

# Submit pipeline directly from pipeline function
# run_result = client.create_run_from_pipeline_func(pipeline_func, 
#                                                   experiment_name=experiment_name, 
#                                                   run_name=run_name, 
#                                                   arguments=arguments,
#                                                   namespace = namespace,
#                                                   pipeline_conf=pipeline_conf)


2024-07-01
