### Setup MLRun Project

In [3]:
import os
from os import path
from mlrun import set_environment, new_project, mlconf

# Set the default environment configuration
project_name, artifact_path = set_environment(project="gitops-project", artifact_path='v3io:///users/{{run.user}}/pipe/{{workflow.uid}}')

# Create project
project_path = path.abspath("project")
project = new_project(name=project_name, context=project_path)

### Import Functions

In [4]:
project.set_function(name="gen-iris",
                    func="components/gen_iris.py",
                    kind="job",
                    image="mlrun/mlrun")
project.set_function(name="describe",
                    func="hub://describe",
                    kind="job",
                    image="mlrun/mlrun")
project.set_function(name="train",
                    func="hub://sklearn_classifier",
                    kind="job",
                    image="mlrun/mlrun")
project.set_function(name="test",
                    func="hub://test_classifier",
                    kind="job",
                    image="mlrun/mlrun")
project.set_function(name="serving",
                    func="hub://model_server",
                    kind="serving",
                    image="mlrun/mlrun")
project.set_function(name="live_tester",
                    func="hub://model_server_tester",
                    kind="serving",
                    image="mlrun/mlrun")

<mlrun.runtimes.kubejob.KubejobRuntime at 0x7f0c765817d0>

### Pipeline

In [6]:
%%writefile {path.join(project_path, 'pipelines/train.py')}
from kfp import dsl
from mlrun import mount_v3io, NewTask

funcs = {}
this_project = None
DATASET = 'iris_dataset'
LABELS  = "label"

# init functions is used to configure function resources and local settings
def init_functions(functions: dict, project=None, secrets=None):
    for f in functions.values():
        f.apply(mount_v3io())
     
    # uncomment this line to collect the inference results into a stream
    # and specify a path in V3IO (<datacontainer>/<subpath>)
    #functions['serving'].set_env('INFERENCE_STREAM', 'users/admin/model_stream')

    
@dsl.pipeline(
    name="Demo training pipeline",
    description="Shows how to use mlrun."
)
def kfpipeline():
    
    # run the ingestion function with the new image and params
    ingest = funcs['gen-iris'].as_step(
        name="get-data",
        handler='iris_generator',
        params={'format': 'pq'},
        outputs=[DATASET])

    # analyze our dataset
    describe = funcs["describe"].as_step(
        name="summary",
        params={"label_column": LABELS},
        inputs={"table": ingest.outputs[DATASET]})
    
    # train with hyper-paremeters
    train = funcs["train"].as_step(
        name="train",
        params={"sample"          : -1,
                "label_column"    : LABELS,
                "test_size"       : 0.10},
        hyperparams={'model_pkg_class': ["sklearn.ensemble.RandomForestClassifier",
                                         "sklearn.linear_model.LogisticRegression",
                                         "sklearn.ensemble.AdaBoostClassifier"]},
        selector='max.accuracy',
        inputs={"dataset"         : ingest.outputs[DATASET]},
        labels={"commit": this_project.params.get('commit', '')},
        outputs=['model', 'test_set'])

    # test and visualize our model
    test = funcs["test"].as_step(
        name="test",
        params={"label_column": LABELS},
        inputs={"models_path" : train.outputs['model'],
                "test_set"    : train.outputs['test_set']})

    # deploy our model as a serverless function
    deploy = funcs["serving"].deploy_step(models={f"{DATASET}_v1": train.outputs['model']},
                                          tag=this_project.params.get('commit', 'v1'))

#     # test out new model server (via REST API calls)
#     tester = funcs["live_tester"].as_step(name='model-tester',
#         params={'addr': deploy.outputs['endpoint'], 'model': f"{DATASET}_v1"},
#         inputs={'table': train.outputs['test_set']})

Overwriting /User/mlrun-github-actions-demo/project/pipelines/train.py


### Save Pipeline

In [9]:
project.set_workflow("train", "pipelines/train.py")
project.save()

### Pipeline

In [10]:
run_id = project.run("train", arguments={}, artifact_path=artifact_path, dirty=True, watch=True)

> 2021-07-30 18:21:50,343 [info] using in-cluster config.


> 2021-07-30 18:21:50,860 [info] Pipeline run id=0b9c3d2c-4a17-4095-a5cb-e2c77b4fc685, check UI or DB for progress
> 2021-07-30 18:21:50,860 [info] waiting for pipeline run completion


uid,start,state,name,results,artifacts
...8e8c9b87,Jul 30 18:22:42,completed,test,accuracy=0.9333333333333333test-error=0.06666666666666667auc-micro=0.9666666666666667auc-weighted=0.9888888888888889f1-score=0.9137254901960784precision_score=0.8888888888888888recall_score=0.9629629629629629,confusion-matrixfeature-importancesprecision-recall-multiclassroc-multiclasstest_set_preds
...76d35c56,Jul 30 18:22:14,completed,summary,,histogramsviolinimbalanceimbalance-weights-veccorrelation-matrixcorrelation
...5f509d1a,Jul 30 18:22:12,completed,train,best_iteration=1accuracy=0.975609756097561test-error=0.024390243902439025auc-micro=0.9973230220107079auc-weighted=0.9966358284272497f1-score=0.9721739130434783precision_score=0.9743589743589745recall_score=0.9722222222222222,test_setconfusion-matrixfeature-importancesprecision-recall-multiclassroc-multiclassmodeliteration_results
...b8fac36c,Jul 30 18:22:00,completed,get-data,,iris_dataset
