In [85]:
import cogflow as cf
import requests
import boto3
import json
import os
      
cf.get_dataset(name="bola")
downloaded_file_name = 'bola.yaml'

web_downloader_op = cf.load_component(file_path=downloaded_file_name)

Downloaded bola.yaml from mlflow


In [86]:
def preprocess(file_path: cf.input_path('CSV'),
              output_file: cf.output_path('parquet')):
    import pandas as pd
    df = pd.read_csv(file_path, header=0, sep=";")
    df.columns = [c.lower().replace(' ', '_') for c in df.columns]
    df.to_parquet(output_file)

In [87]:
preprocess_op=cf.create_component_from_func(
        func=preprocess,
        output_component_file='preprocess-component.yaml',
        base_image='hiroregistry/cogflow:1.9.36b1',
        packages_to_install=[])

In [88]:

def training(file_path: cf.input_path('parquet'))->dict:
    
    import pandas as pd
    from sklearn.model_selection import train_test_split
    from sklearn.metrics import classification_report
    from sklearn.linear_model import ElasticNet
    import os
    from datetime import datetime
    import numpy as np
    import cogflow as cf
    import json
    df = pd.read_parquet(file_path)
    #os.environ['API_BASEPATH']= 'http://model-register-service/cogapi/'
    print("ENV_VAR",os.getenv("MLFLOW_S3_ENDPOINT_URL"))
    
    cf.autolog()
    
    target_column='quality'
    train_x, test_x, train_y, test_y = train_test_split(df.drop(columns=[target_column]),
                                                    df[target_column], test_size=.25,
                                                    random_state=1337, stratify=df[target_column])  

    with cf.start_run() as run:
        alpha =  0.5
        l1_ratio =  0.5
        lr = ElasticNet(alpha=alpha, l1_ratio=l1_ratio, random_state=42)
        lr.fit(train_x, train_y)
        cf.log_param("alpha",alpha)
        cf.log_param("l1_ratio",l1_ratio)
        
        model_name = "wine-elasticnet"
        run_id = run.info.run_id
        artifact_path = "model"
        result = cf.log_model(lr, artifact_path, registered_model_name= "wine-elasticnet")

        print(f"Artifact_uri", run.info.artifact_uri)
        print(f"Artifact_path", artifact_path)
        print(f"run_id", run_id)
        print(f"model_name", model_name)
        return run.info.artifact_uri
    
    

In [89]:
training_op=cf.create_component_from_func(
        func=training,
        output_component_file='train-component.yaml', 
        base_image='hiroregistry/cogflow:1.9.36b1',
        packages_to_install=[])

In [90]:
def get_registered_model_details(model_name: str, run_id: str) -> dict:

    from cogflow import cogclient
    client = cogclient
    
    # Filter the registered model based on model name
    filter_string = f"name='{model_name}' AND run_id='{run_id}'"
    registered_model = client.search_registered_models(filter_string=filter_string)
    
    return registered_model

In [91]:
get_model_details_op = cf.create_component_from_func(
    func=get_registered_model_details,
    output_component_file='get-model-details-component.yaml',
    base_image='hiroregistry/cogflow:1.9.36b1',
    packages_to_install=[])

In [92]:

def serving(model_uri,name):
    import cogflow as cf
    cf.serve_model_v1(model_uri,name)
    

In [93]:
kserve_op=cf.create_component_from_func(func=serving,
        output_component_file='kserve-component.yaml',
        base_image='hiroregistry/cogflow:1.9.36b1',
        packages_to_install=[])

In [94]:
def getmodel(name):
    import cogflow as cf
    cf.get_model_url(name)
    

In [95]:
getmodel_op=cf.create_component_from_func(func=getmodel,
        output_component_file='kserve-component.yaml',
        base_image='hiroregistry/cogflow:1.9.36b1',
        packages_to_install=[])

In [96]:

@cf.pipeline(name="pipeline", description="WINE pipeline")
def wine_pipeline(url, isvc):
    web_downloader_task = web_downloader_op(url=url)
    preprocess_task = preprocess_op(file=web_downloader_task.outputs['data'])
    
    train_task = (training_op(file=preprocess_task.outputs['output']));
    train_task=train_task.AddModelAccess()    
    
    kserve_task=kserve_op(model_uri=train_task.output,name=isvc)
    kserve_task.after(train_task)
    
    getmodel_task=getmodel_op(isvc)
    getmodel_task.after(kserve_task)


In [97]:

client = cf.client()

client.create_run_from_pipeline_func(
    wine_pipeline,
    arguments={
        "url": "https://raw.githubusercontent.com/Barteus/kubeflow-examples/main/e2e-wine-kfp-mlflow/winequality-red.csv",
    "isvc":"sample-test1"}
)



RunPipelineResult(run_id=1189b65a-655a-4f5e-88f6-35f64ef9c004)

In [98]:
import requests
inference_input = {
"instances": [
    [10.1, 0.37, 0.34, 2.4, 0.085, 5.0, 17.0, 0.99683, 3.17, 0.65, 10.6]]
}
response = requests.post(f"http://10.100.184.43/v1/models/model:predict", json=inference_input)
print(response.text)

{"predictions":[5.741928028712652]}
