In [1]:
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from sklearn.model_selection import train_test_split
from sklearn.linear_model import ElasticNet
import mlflow
import pandas as pd
import numpy as np
from prefect import task, Flow
from datetime import timedelta
from prefect.schedules import IntervalSchedule

import os
import warnings
import sys

import pandas as pd
import numpy as np
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from sklearn.model_selection import train_test_split
from sklearn.linear_model import ElasticNet
from urllib.parse import urlparse
import mlflow
import mlflow.sklearn
import mlflow.pyfunc

import logging

In [2]:
def fetch_data():
    csv_url = "http://archive.ics.uci.edu/ml/machine-learning-databases/wine-quality/winequality-red.csv"
    data = pd.read_csv(csv_url, sep=";")
    return data

data = fetch_data()
data.head()

Unnamed: 0,fixed acidity,volatile acidity,citric acid,residual sugar,chlorides,free sulfur dioxide,total sulfur dioxide,density,pH,sulphates,alcohol,quality
0,7.4,0.7,0.0,1.9,0.076,11.0,34.0,0.9978,3.51,0.56,9.4,5
1,7.8,0.88,0.0,2.6,0.098,25.0,67.0,0.9968,3.2,0.68,9.8,5
2,7.8,0.76,0.04,2.3,0.092,15.0,54.0,0.997,3.26,0.65,9.8,5
3,11.2,0.28,0.56,1.9,0.075,17.0,60.0,0.998,3.16,0.58,9.8,6
4,7.4,0.7,0.0,1.9,0.076,11.0,34.0,0.9978,3.51,0.56,9.4,5


In [3]:
# logging.basicConfig(level=logging.WARN)
# logger = logging.getLogger(__name__)



def download_data():
    # Read the wine-quality csv file from the URL
    csv_url = (
        "http://archive.ics.uci.edu/ml/machine-learning-databases/wine-quality/winequality-red.csv"
    )

    try:
        data = pd.read_csv(csv_url, sep=";")
    except Exception as e:
        logger.exception(
            "Unable to download training & test CSV, check your internet connection. Error: %s", e
        )
    return data

    
def train_model(train,  alpha=0.5, l1_ratio=0.5):
    warnings.filterwarnings("ignore")
    np.random.seed(40)

    
    train, test = train_test_split(data)

    # The predicted column is "quality" which is a scalar from [3, 9]
    train_x = train.drop(["quality"], axis=1)
    
    train_y = train[["quality"]]
   

    lr = ElasticNet(alpha=alpha, l1_ratio=l1_ratio, random_state=42)
    lr.fit(train_x, train_y)
    print('training the model is complete')
    return lr




def eval_metrics(test, model=None):    
    mlflow.set_experiment('/wine_experiment')
    
    with mlflow.start_run():                        
        # The predicted column is "quality" which is a scalar from [3, 9]
        test_x = test.drop(["quality"], axis=1)
        test_y = test[["quality"]]
        actual = test_y

        pred = model.predict(test_x)
        rmse = np.sqrt(mean_squared_error(actual, pred))
        mae = mean_absolute_error(actual, pred)
        r2 = r2_score(actual, pred)
#         print("Elasticnet model (alpha=%f, l1_ratio=%f):" % (alpha, l1_ratio))
        print("  RMSE: %s" % rmse)
        print("  MAE: %s" % mae)
        print("  R2: %s" % r2)

#         mlflow.log_param("alpha", alpha)
#         mlflow.log_param("l1_ratio", l1_ratio)
        mlflow.log_metric("rmse", rmse)
        mlflow.log_metric("r2", r2)
        mlflow.log_metric("mae", mae)
        
        
        tracking_url_type_store = urlparse(mlflow.get_tracking_uri()).scheme

        # Model registry does not work with file store
        if tracking_url_type_store != "file":

            # Register the model
            # There are other ways to use the Model Registry, which depends on the use case,
            # please refer to the doc for more information:
            # https://mlflow.org/docs/latest/model-registry.html#api-workflow
            mlflow.sklearn.log_model(model, "model", registered_model_name="ElasticnetWineModel")
        else:
            mlflow.sklearn.log_model(model, "model")

    return rmse, mae, r2



# to run above functions manually without prefect, comment out this code
# Split the data into training and test sets. (0.75, 0.25) split.
# data = download_data()

# train, test = train_test_split(data)

# train_model(train)


# model = mlflow.sklearn.load_model("runs:/a19eb3e5531d496893f334c68e17d5fa/model")
# if model != None:
#     (rmse, mae, r2) = eval_metrics(test, model)

In [4]:
from prefect import task, Flow, Parameter, Client
from prefect.run_configs import KubernetesRun
from prefect.schedules import IntervalSchedule
from prefect.storage import S3
from prefect.tasks.control_flow import merge
from prefect import task, Flow, case
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from sklearn.model_selection import train_test_split
from sklearn.linear_model import ElasticNet
from datetime import timedelta
import numpy as np
import pandas as pd
import mlflow
import requests
from typing import Tuple

In [5]:

@task
def get_best_model_run_id(experiment_name):    
    current_experiment=dict(mlflow.get_experiment_by_name(experiment_name))
    experiment_id=current_experiment['experiment_id']
    df = mlflow.search_runs([experiment_id], order_by=["metrics.rmse ASC"])     
    if len(df) != 0:
        best_run_id = df.loc[0,'run_id']
        print('best run id is:  ', best_run_id)
        return best_run_id
    

@task(nout=2)
def fetch_data_task():
    csv_url = "http://archive.ics.uci.edu/ml/machine-learning-databases/wine-quality/winequality-red.csv"
    data = pd.read_csv(csv_url, sep=";")
    train, test = train_test_split(data)
    return train, test
 
@task
def load_model_task(run_id):  
    if run_id != None:
        model = mlflow.sklearn.load_model(f"runs:/{run_id}/model")
        return model

@task
def train_model_task(train_data,  alpha=0.5, l1_ratio=0.5):        
    model = train_model(train_data,  alpha=0.5, l1_ratio=0.5)    
    return model
    

@task(nout=3)
def eval_model_task(test_data, model):             
    (rmse, mae, r2) = eval_metrics(test_data, model)
    return rmse, mae, r2

@task
def is_model_None_condition(model):
    if model is None:
        return True
    else:
        return False

@task
def is_model_degraded_condition(rmse):
    if rmse > 0.4:
        return True
    else:
        return False   

In [6]:
schedule = IntervalSchedule(interval=timedelta(minutes=2))


with Flow("MLOps", schedule = schedule) as flow:
    
    run_id = get_best_model_run_id(experiment_name = '/wine_experiment')    
    model = load_model_task(run_id)
    
    # condition
    model_None = is_model_None_condition(model)
    train_data, test_data = fetch_data_task()
    
    # Inference here, and if the MAE is below certain level, fire up the training
    # also, fire up the training if model is None, i.e. no training until now
     
    model = train_model_task(train_data, alpha=0.3, l1_ratio=0.3)    
    rmse, mae, r2 = eval_model_task(test_data, model)        
    is_model_degraded = is_model_degraded_condition(rmse)

    # train on new data if error is more than threshold and log the results on test data
    with case(is_model_degraded, True): 
        model = train_model_task(train_data)        
        rmse_, mae, r2  = eval_model_task(test_data, model)

    
state = flow.run()
print('rmse is: ', state.result[rmse].result)
# flow.visualize()

[2022-07-19 22:16:27+0200] INFO - prefect.FlowRunner | Beginning Flow run for 'MLOps'
[2022-07-19 22:16:27+0200] INFO - prefect.TaskRunner | Task 'get_best_model_run_id': Starting task run...
best run id is:   383e7a732ff247b79a044c44d4c63eaf
[2022-07-19 22:16:27+0200] INFO - prefect.TaskRunner | Task 'get_best_model_run_id': Finished task run for task with final state: 'Success'
[2022-07-19 22:16:27+0200] INFO - prefect.TaskRunner | Task 'load_model_task': Starting task run...
[2022-07-19 22:16:27+0200] INFO - prefect.TaskRunner | Task 'load_model_task': Finished task run for task with final state: 'Success'
[2022-07-19 22:16:27+0200] INFO - prefect.TaskRunner | Task 'is_model_None_condition': Starting task run...
[2022-07-19 22:16:27+0200] INFO - prefect.TaskRunner | Task 'is_model_None_condition': Finished task run for task with final state: 'Success'
[2022-07-19 22:16:27+0200] INFO - prefect.TaskRunner | Task 'fetch_data_task': Starting task run...
[2022-07-19 22:16:29+0200] INFO -