In [1]:
from os import path
import mlrun

In [2]:
# Load the project with configuration

project_name_base = 'securenet'
project_path = 'conf'
securenet_proj = mlrun.projects.load_project(project_path,  clone=True)

project = securenet_proj
print(f'Project path: {project_path}\nProject name: {project.name}')

Project path: conf
Project name: securenet-floyed


In [3]:
names = [func.get('metadata').get('name')
         for func in mlrun.get_run_db().list_functions(project={project.name}, tag='latest')
         if func.get('kind') != '']
print(names)

['test-classifier', 'v2-model-tester', 'describe', 'sklearn-classifier', 'xgb-trainer', 'xgb-test', 'data-clean', 'v2-model-server', 'serving', 'train-data']


## Importing functions

In [4]:
project.set_function(f'db://{project.name}/data-clean')
project.set_function(f'db://{project.name}/describe')
#project.set_function(f'db://{project.name}/train-data')
project.set_function('hub://v2_model_tester', 'live_tester')
project.set_function(f'db://{project.name}/serving')
project.set_function('hub://test_classifier', 'test')

<mlrun.runtimes.kubejob.KubejobRuntime at 0x7ff87a190c90>

In [5]:
project.functions

[{'name': 'train-data',
  'spec': {'kind': 'job',
   'metadata': {'name': 'train-data',
    'tag': 'latest',
    'hash': '0c2ffcee3373c9b07757888ed399b8804ec5e91b',
    'project': 'securenet-floyed',
    'categories': [],
    'updated': '2021-07-06T22:41:24.172655+00:00'},
   'spec': {'command': '',
    'args': [],
    'image': 'mlrun/mlrun',
    'env': [{'name': 'V3IO_API', 'value': ''},
     {'name': 'V3IO_USERNAME', 'value': ''},
     {'name': 'V3IO_ACCESS_KEY', 'value': ''}],
    'default_handler': '',
    'entry_points': {'train_data': {'name': 'train_data',
      'doc': '',
      'parameters': [{'name': 'context', 'default': ''},
       {'name': 'dataset', 'type': 'DataItem', 'default': ''},
       {'name': 'label_column', 'type': 'str', 'default': 'label'}],
      'outputs': [{'default': ''}],
      'lineno': 14}},
    'description': '',
    'build': {'functionSourceCode': 'IyBHZW5lcmF0ZWQgYnkgbnVjbGlvLmV4cG9ydC5OdWNsaW9FeHBvcnRlcgoKZnJvbSBza2xlYXJuIGltcG9ydCBwcmVwcm9jZXNzaW5nCm

## Kubeflow pipeline

In [6]:
%%writefile {path.join(project_path, 'workflow.py')}

from kfp import dsl
import mlrun
from mlrun.platforms import auto_mount

funcs = {}
data_source_url = 'store:///raw_data' 
labels = 'label'
GPUS = False

cleaned_dataset = 'cleaned_data'
model = 'logisticRegression_model'


# Configure function resources and local settings
def init_functions(functions: dict, project=None, secrets=None):
    for f in functions.values():
        f.apply(auto_mount())

# Create a Kubeflow Pipelines pipeline
@dsl.pipeline(
      name="SecureNet",
    description="The workflow implements data processing, training and testing the model and serving the model"
)
def kfpipeline():

    # Ingest the data set
    ingest = funcs['data-clean'].as_step(
    name='data_clean',
    handler='data_clean',
    inputs={"src": data_source_url,'cleaned_key':cleaned_dataset},
    outputs=[cleaned_dataset])
    
    #analyze data
    
    describe = funcs['describe'].as_step(
    name="summary",
    params={"label_column": labels},
    inputs={"table": ingest.outputs[cleaned_dataset]})
    
    # Train a model   
    train = funcs["train-data"].as_step(
        name="train_data",
        params={"label_column": labels},
        inputs={"dataset": ingest.outputs[cleaned_dataset]},
        outputs=['model', 'test_set'])
    

    test = funcs["test"].as_step(
        name="test",
        params={"label_column": labels},
        inputs={"models_path": train.outputs['model'],
                "test_set": train.outputs['test_set']})
    
    # Deploy the model as a serverless function
    deploy = funcs["serving"].deploy_step(
        models={f"{model}_v1": train.outputs['model']})

Overwriting conf/workflow.py


## Register workflow

In [7]:
# Register the workflow file as "main"
project.set_workflow('infer', 'workflow.py')

In [8]:
project.save()

In [9]:
import os 
from os import environ, path
from mlrun import mlconf
mlconf.artifact_path

'v3io:///projects/{{run.project}}/artifacts'

In [11]:
pipeline_path = mlconf.artifact_path

run_id = project.run(
    'infer',
    arguments={}, 
    artifact_path=os.path.join(pipeline_path, "pipeline", '{{workflow.uid}}'),
    dirty=True,
    watch=True)