In [1]:
# commands to push docker image to hub:
# docker logout
# docker login
# docker tag image_name:tag user/hubname:image_name
# docker push user/hubname:image_name

In [2]:
import kfp
import kfp.components as comp
import requests
import kfp.dsl as dsl
import pandas as pd

In [3]:
def data_ingestion():
    from mlFlowProject import logger
    from mlFlowProject.pipeline.stage_01_data_ingestion import DataIngestionTrainingPipeline
    
    STAGE_NAME = "data ingestion stage"
    try:
        logger.info(f">>>> stage {STAGE_NAME} started <<<<")
        data_ingestion = DataIngestionTrainingPipeline()
        data_ingestion.main()
        logger.info(f">>>> stage {STAGE_NAME} completed <<<<\n\nx========x")
    except Exception as e:
        logger.exception(e)
        raise e

In [4]:
def data_validation():
    from mlFlowProject import logger
    from mlFlowProject.pipeline.stage_02_data_validation import DataValidationTrainingPipeline
    
    STAGE_NAME = "data validation stage"
    try:
        logger.info(f">>>> stage {STAGE_NAME} started <<<<")
        data_val = DataValidationTrainingPipeline()
        data_val.main()
        logger.info(f">>>> stage {STAGE_NAME} completed <<<<\n\nx========x")
    except Exception as e:
        logger.exception(e)
        raise e

In [5]:
def data_transformation():
    from mlFlowProject import logger
    from mlFlowProject.pipeline.stage_03_data_transformation import DataTransformationTrainingPipeline
    
    STAGE_NAME = "data transformation stage"
    try:
        logger.info(f">>>> stage {STAGE_NAME} started <<<<")
        data_transform = DataTransformationTrainingPipeline()
        data_transform.main()
        logger.info(f">>>> stage {STAGE_NAME} completed <<<<\n\nx========x")
    except Exception as e:
        logger.exception(e)
        raise e

In [6]:
def model_trainer():
    from mlFlowProject import logger
    from mlFlowProject.pipeline.stage_04_model_trainer import ModelTrainerTrainingPipeline
    
    STAGE_NAME = "Model training stage"
    try:
        logger.info(f">>>> stage {STAGE_NAME} started <<<<")
        model_train = ModelTrainerTrainingPipeline()
        model_train.main()
        logger.info(f">>>> stage {STAGE_NAME} completed <<<<\n\nx========x")
    except Exception as e:
        logger.exception(e)
        raise e

In [7]:
def model_evaluation():
    from mlFlowProject import logger
    from mlFlowProject.pipeline.stage_05_model_evaluation import ModelEvaluationTrainingPipeline
    
    STAGE_NAME = "Model evaluation stage"
    try:
        logger.info(f">>>> stage {STAGE_NAME} started <<<<")
        model_eval = ModelEvaluationTrainingPipeline()
        model_eval.main()
        logger.info(f">>>> stage {STAGE_NAME} completed <<<<\n\nx========x")
    except Exception as e:
        logger.exception(e)
        raise e

In [8]:
create_step_data_ingestion = kfp.components.create_component_from_func(
    func = data_ingestion,
    base_image='prajesh7/sail_mlflow:sail_mlflow',
    # packages_to_install=['mlFlowProject']
)

In [9]:
create_step_data_validation = kfp.components.create_component_from_func(
    func = data_validation,
    base_image='prajesh7/sail_mlflow:sail_mlflow',
    packages_to_install=['pandas', 'mlFlowProject']
)

In [10]:
create_setp_data_transformation = kfp.components.create_component_from_func(
    func = data_transformation,
    base_image='prajesh7/sail_mlflow:sail_mlflow',
    packages_to_install=['pandas', 'scikit-learn', 'mlFlowProject']
)

In [11]:
create_step_model_training = kfp.components.create_component_from_func(
    func = model_trainer,
    base_image='prajesh7/sail_mlflow:sail_mlflow',
    packages_to_install=['pandas', 'scikit-learn', 'numpy', 'mlFlowProject']
)

In [12]:
create_step_model_evaluation = kfp.components.create_component_from_func(
    func = model_evaluation,
    base_image='prajesh7/sail_mlflow:sail_mlflow',
    packages_to_install=['pandas', 'mlflow', 'scikit-learn', 'numpy', 'mlFlowProject']
)

In [13]:
@dsl.pipeline(
    name="sail prediction kubeflow/mlflow pipeline",
    description="testing sample"
)
def sail_prediction_pipeline(data_path: str):
    vop = dsl.VolumeOp(
        name = "t-vol",
        resource_name = "t-vol",
        size = "1Gi",
        modes=dsl.VOLUME_MODE_RWO
    )
    
    data_ingestion = create_step_data_ingestion().add_pvolumes({data_path: vop.volume})
    data_validation = create_step_data_validation().add_pvolumes({data_path: vop.volume}).after(data_ingestion)
    data_transformation = create_setp_data_transformation().add_pvolumes({data_path: vop.volume}).after(data_validation)
    model_trainer = create_step_model_training().add_pvolumes({data_path: vop.volume}).after(data_transformation)
    model_evaluation = create_step_model_evaluation().add_pvolumes({data_path: vop.volume}).after(model_trainer)
    
    data_ingestion.execution_options.caching_strategy.max_cache_staleness = "POD"
    data_validation.execution_options.caching_strategy.max_cache_staleness = "POD"
    data_transformation.execution_options.caching_strategy.max_cache_staleness = "POD"
    model_trainer.execution_options.caching_strategy.max_cache_staleness = "POD"
    model_evaluation.execution_options.caching_strategy.max_cache_staleness = "POD"

In [14]:
pwd

'c:\\Prajesh\\personal\\ML_learning\\my_work\\kubeflow_app\\kubeflow_ML_app\\ML_flow_app\\src\\mlFlowProject\\pipeline'

In [15]:
cd ../../..

c:\Prajesh\personal\ML_learning\my_work\kubeflow_app\kubeflow_ML_app\ML_flow_app


In [16]:
kfp.compiler.Compiler().compile(
    pipeline_func = sail_prediction_pipeline,
    package_path = 'sail_prediction_pipeline2.yaml'
)

In [17]:
client = kfp.Client()

In [18]:
DATA_PATH = '/artifacts'

import datetime
print(datetime.datetime.now().date())

pipeline_func = sail_prediction_pipeline
experiment_name = 'sail_exp' + "_" + str(datetime.datetime.now().date())
run_name = pipeline_func.__name__ + ' run'
namespace = "kubeflow"

arguments = {"data_path": DATA_PATH}

kfp.compiler.Compiler().compile(
    pipeline_func,
    '{}.zip'.format(experiment_name),
)

run_result = client.create_run_from_pipeline_func(
    pipeline_func,
    experiment_name = experiment_name,
    run_name = run_name,
    arguments = arguments
)

2023-08-07
