In [None]:
import prefect
from prefect import task, Flow, Parameter, Client
from prefect.run_configs import KubernetesRun
from prefect.schedules import IntervalSchedule
from prefect.storage import S3

from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from sklearn.model_selection import train_test_split
from sklearn.linear_model import ElasticNet

from datetime import timedelta
import time
import getpass

import numpy as np
import pandas as pd

import mlflow
import requests

In [None]:
prefect.__version__

# Model training pipeline

In [None]:
@task
def fetch_data():
    csv_url = "http://archive.ics.uci.edu/ml/machine-learning-databases/wine-quality/winequality-red.csv"
    data = pd.read_csv(csv_url, sep=";")
    return data

def eval_metrics(actual, pred):
    rmse = np.sqrt(mean_squared_error(actual, pred))
    mae = mean_absolute_error(actual, pred)
    r2 = r2_score(actual, pred)
    return rmse, mae, r2

@task
def train_model(data, mlflow_experiment_id, alpha=0.5, l1_ratio=0.5):
    mlflow.set_tracking_uri("http://mlflow.mlflow:5000")
 
    train, test = train_test_split(data)
    
    # The predicted column is "quality" which is a scalar from [3, 9]
    train_x = train.drop(["quality"], axis=1)
    test_x = test.drop(["quality"], axis=1)
    train_y = train[["quality"]]
    test_y = test[["quality"]]
    
    with mlflow.start_run(experiment_id=mlflow_experiment_id):
        lr = ElasticNet(alpha=alpha, l1_ratio=l1_ratio, random_state=42)
        lr.fit(train_x, train_y)
        predicted_qualities = lr.predict(test_x)
        (rmse, mae, r2) = eval_metrics(test_y, predicted_qualities)

        print("Elasticnet model (alpha=%f, l1_ratio=%f):" % (alpha, l1_ratio))
        print("  RMSE: %s" % rmse)
        print("  MAE: %s" % mae)
        print("  R2: %s" % r2)

        mlflow.log_param("alpha", alpha)
        mlflow.log_param("l1_ratio", l1_ratio)
        mlflow.log_metric("rmse", rmse)
        mlflow.log_metric("r2", r2)
        mlflow.log_metric("mae", mae)

        mlflow.sklearn.log_model(lr, "model")

In [None]:
domain = "mlops2.demo4nic.com"  # the domain where you are hosting OpenMLOps
username = "nicdemo1@alchemyst.xyz"        # the username you used to register on OpenMLOps
s3_bucket = "demo4nic-mlflow2"  # the S3 bucket you specified when setting up OpenMLOps

prefect_project_name = "wine-quality-project"         # you can use what you want here
docker_image = "smandaric/prefect:winequality-v1"    # any docker image that has the required Python dependencies


auth_url = f"https://{domain}/.ory/kratos/public/self-service/login/api"
prefect_url = f"https://prefect.{domain}/graphql"

def get_prefect_token():
    r = requests.get(auth_url)
    jsn = r.json()
    action_url = jsn["ui"]["action"]
    data = {"password_identifier": username, "password": password, "method": "password"}
    headers = {"Accept": "application/json", "Content-Type": "application/json"}
    r = requests.post(action_url, json=data, headers=headers)
    r.raise_for_status()
    jsn = r.json()
    session_token = jsn["session_token"]
    return session_token

def get_tenant_id():
    session_token = get_prefect_token()
    prefect_client = Client(api_server=prefect_url, api_key=get_prefect_token())
    return prefect_client.get_available_tenants()[0]["id"]


In [None]:
password = getpass.getpass()    # the password you used to register on OpenMLOps

In [None]:
def create_prefect_flow(mlflow_experiment_id):
    run_config = KubernetesRun(
        labels=["dev"],
        service_account_name="prefect-server-serviceaccount",
        image=docker_image
    )
    storage = S3(s3_bucket)

    session_token = get_prefect_token()
    prefect_client = Client(api_server=prefect_url, api_key=session_token, tenant_id=get_tenant_id())
    schedule = IntervalSchedule(interval=timedelta(minutes=2))

    with Flow("train-wine-quality-model", schedule, storage=storage, run_config=run_config) as flow:
        data = fetch_data()
        train_model(data=data, mlflow_experiment_id=mlflow_experiment_id, alpha=0.3, l1_ratio=0.3)

    prefect_client.create_project(project_name=prefect_project_name)
    training_flow_id = prefect_client.register(flow, project_name=prefect_project_name)
    flow_run_id = prefect_client.create_flow_run(flow_id=training_flow_id, run_name=f"run {prefect_project_name}")
    
create_prefect_flow(mlflow_experiment_id=1)


# Model deployment pipeline

In [None]:
environment = "seldon"

import yaml
import prefect
from prefect import task
from kubernetes import client, config

seldon_deployment = """
    apiVersion: machinelearning.seldon.io/v1alpha2
    kind: SeldonDeployment
    metadata:
      name: wines-classifier
      namespace: seldon
    spec:
      predictors:
      - graph:
          children: []
          implementation: MLFLOW_SERVER
          modelUri: dummy
          name: wines-classifier
        name: model-a
        replicas: 1
        traffic: 100
        componentSpecs:
        - spec:
            # We are setting high failureThreshold as installing conda dependencies
            # can take long time and we want to avoid k8s killing the container prematurely
            containers:
            - name: wines-classifier
              image: seldonio/mlflowserver:1.14.0-dev
              livenessProbe:
                initialDelaySeconds: 60
                failureThreshold: 300
                periodSeconds: 5
                successThreshold: 1
                httpGet:
                  path: /health/ping
                  port: http
                  scheme: HTTP
              readinessProbe:
                initialDelaySeconds: 60
                failureThreshold: 300
                periodSeconds: 5
                successThreshold: 1
                httpGet:
                  path: /health/ping
                  port: http
                  scheme: HTTP
"""

CUSTOM_RESOURCE_INFO = dict(
    group="machinelearning.seldon.io",
    version="v1alpha2",
    plural="seldondeployments",
)

In [None]:
@task
def deploy_model(model_uri: str, namespace: str = "seldon"):
    logger = prefect.context.get("logger")

    logger.info(f"Deploying model {model_uri} to enviroment {namespace}")

    config.load_incluster_config()
    custom_api = client.CustomObjectsApi()

    dep = yaml.safe_load(seldon_deployment)
    dep["spec"]["predictors"][0]["graph"]["modelUri"] = model_uri

    try:
        resp = custom_api.create_namespaced_custom_object(
            **CUSTOM_RESOURCE_INFO,
            namespace=namespace,
            body=dep,
        )

        logger.info("Deployment created. status='%s'" % resp["status"]["state"])
    except:
        logger.info("Updating existing model")
        existing_deployment = custom_api.get_namespaced_custom_object(
            **CUSTOM_RESOURCE_INFO,
            namespace=namespace,
            name=dep["metadata"]["name"],
        )
        existing_deployment["spec"]["predictors"][0]["graph"]["modelUri"] = model_uri

        resp = custom_api.replace_namespaced_custom_object(
            **CUSTOM_RESOURCE_INFO,
            namespace=namespace,
            name=existing_deployment["metadata"]["name"],
            body=existing_deployment,
        )

In [None]:
run_config = KubernetesRun(
    labels=["dev"],
    service_account_name="prefect-server-serviceaccount",
    image=docker_image,
)
storage = S3(s3_bucket)

prefect_client = Client(api_server=prefect_url, api_key=get_prefect_token())

with Flow("deploy-wine-quality-model", storage=storage, run_config=run_config) as flow:
    model_uri_parameter =  Parameter("model_uri")
    deploy_model(model_uri_parameter, namespace="seldon")
    
prefect_client = Client(api_server=prefect_url, api_key=get_prefect_token(), tenant_id=get_tenant_id())
prefect_client.create_project(project_name=prefect_project_name)
training_flow_id = prefect_client.register(flow, project_name=prefect_project_name)

In [None]:
prediction_url = f"https://{domain}/seldon/seldon/wines-classifier/api/v0.1/predictions"
payload = {"data": { "ndarray": [[7.3, 0.35, 0.24, 2.0, 0.067, 28.0, 48.0, 0.9957600000000001, 3.43, 0.54, 10.0]]}}
token = get_prefect_token()
api_request = requests.post(
        prediction_url,
        json=payload,
        headers={"Authorization": f"Bearer {token}"},
        timeout=600
)
api_request.json()