In [1]:
import os
import kfp
from kfp.components import load_component_from_file
from kfp.compiler.compiler import Compiler 
import kfp.dsl as dsl
from dotenv import load_dotenv

In [2]:
load_dotenv("../.env")

True

In [3]:
#Load Components

load_data_op = load_component_from_file("components_yaml/load_data_component.yaml")
preprocess_data_op = load_component_from_file("components_yaml/preprocess_data_component.yaml")
tunehp_xgboost_op = load_component_from_file("components_yaml/tune_xgboost_component.yaml")
train_xgboost_op = load_component_from_file("components_yaml/train_xgboost_component.yaml")
eval_xgboost_op = load_component_from_file("components_yaml/eval_xgboost_component.yaml")
deploy_xgboost_op = load_component_from_file("components_yaml/deploy_xgboost_component.yaml")

In [4]:
# parameters declaration

arguments = {
    "access_key_id":os.environ.get("AWS_ACCESS_KEY_ID"),
    "access_key_secret":os.environ.get("AWS_SECRET_ACCESS_KEY"),
    "filename":"adult_train.csv",
    "model_registry":"fyc-dev-env",
}

In [5]:
# create experiment
client = kfp.Client()
experiment = client.create_experiment(name="salary_prediction_pipeline_experiment", 
                                      description="pipeline simple d'entraînement d'un model de machine learning")


In [6]:
#create a pipeline

@dsl.pipeline(
    name="salary_prediction_pipeline",
    description="pipeline d'entrainement d'un modèle de prédiction de salaire",
)
def salary_prediction_pipeline(
    access_key_id:str,
    access_key_secret:str,
    filename:str,
    model_registry:str,

):
    load_data_task = load_data_op(
        access_key_id,
        access_key_secret,
        filename
    )
    preprocess_data_task = preprocess_data_op(
        load_data_task.outputs['data']
    )
    tunehp_xgboost_task = tunehp_xgboost_op(
        preprocess_data_task.outputs['train'],
        preprocess_data_task.outputs['test']
    )
    train_xgboost_task = train_xgboost_op(
        preprocess_data_task.outputs['train'],
        tunehp_xgboost_task.outputs["n_estimators"],
        tunehp_xgboost_task.outputs["learning_rate"],
        tunehp_xgboost_task.outputs["max_depth"],
        tunehp_xgboost_task.outputs["booster"],
        model_registry,
        access_key_id,
        access_key_secret
    )
    eval_xgboost_task = eval_xgboost_op(
        preprocess_data_task.outputs['test'],
        train_xgboost_task.outputs["modelname"],
        model_registry,
        access_key_id,
        access_key_secret)
    deploy_xgboost_task = deploy_xgboost_op(
        train_xgboost_task.outputs["modelname"],
        access_key_id,
        access_key_secret
    )


In [None]:
#Create a run in the current experiment

client.create_run_from_pipeline_func(salary_prediction_pipeline, arguments=arguments,experiment_name=experiment.name)

RunPipelineResult(run_id=277c551d-426d-49b8-86b6-04232784439b)