In [14]:
import json
import boto3
import pickle
import pyarrow

import numpy as np
import pandas as pd

from datetime import datetime
from dateutil.relativedelta import relativedelta

from sklearn.compose import ColumnTransformer
# from sklearn.preprocessing import OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline

from sklearn.feature_extraction import DictVectorizer

from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error

from sklearn.linear_model import LinearRegression, Ridge
from sklearn.ensemble import RandomForestRegressor

# import xgboost as xgb
from xgboost import XGBRegressor

import mlflow
from mlflow.tracking import MlflowClient
from mlflow.entities import ViewType

from hyperopt import STATUS_OK, Trials, fmin, hp, tpe, space_eval
from hyperopt.pyll import scope

from prefect import task, flow
from prefect.task_runners import SequentialTaskRunner

# mlflow.set_tracking_uri("http://127.0.0.1:5000/")


def read_file(key, bucket='kkr-mlops-zoomcamp'):

    session = boto3.session.Session()
    s3 = session.client(
        service_name='s3',
        endpoint_url='https://storage.yandexcloud.net',
        region_name='ru-central1',
        # aws_access_key_id = "id",
        # aws_secret_access_key = "key")
    )
    obj = s3.get_object(Bucket=bucket, Key=key)

    data = pd.read_csv(obj['Body'], sep=",")

    return data


@task
def load_data(current_date = "2015-5-17", periods = 1):
    
    dt_current = datetime.strptime(current_date, "%Y-%m-%d")
    
    if periods == 1:
        date_file = dt_current + relativedelta(months = - 1)
        print(f"Getting TEST data for {date_file.year}-{date_file.month} period")
        test_data = read_file(key = f"datasets/car-prices-{date_file.year}-{date_file.month}.csv")

        return test_data

    else:
        train_data = pd.DataFrame()
        for i in range(periods+1, 1, -1):
            date_file = dt_current + relativedelta(months = - i)
            try:
                data = read_file(key = f"datasets/car-prices-{date_file.year}-{date_file.month}.csv")
                print(f"Getting TRAIN data for {date_file.year}-{date_file.month} period")
            except:
                print(f"Cannot find file car-prices-{date_file.year}-{date_file.month}.csv",
                    "using blank")
                data = None
                
            train_data = pd.concat([train_data, data])
        
        return train_data


@task
def na_filter(data):
    work_data = data.copy()
    non_type = work_data[data['make'].isna() | data['model'].isna() | data['trim'].isna()].index
    work_data.drop(non_type, axis=0, inplace=True)

    y = work_data.pop('sellingprice')

    return work_data, y


class FeaturesModifier:
    def __init__(self, columns):
        self.columns = columns

    def fit(self, work_data, _ = None):
        return self

    def transform(self, work_data, _ = None):

        work_data = pd.DataFrame(work_data, columns = self.columns)
        work_data['make_model_trim'] = work_data['make'] + '_'  + work_data['model'] + '_' + work_data['trim']
        work_data['year'] = work_data['year'].astype('str')
        
        cat_cols = ['year', 'make_model_trim', 'body', 'transmission', 'color', 'interior']
        num_cols = ['condition', 'odometer', 'mmr']

        X = work_data[cat_cols + num_cols].copy()
        X_dict = X.to_dict(orient = 'records')

        return X_dict

    def fit_transform(self, work_data, _ = None):
        return self.transform(work_data)


@task
def prepare_features(work_data, preprocessor = None):

    num_2_impute = ['condition', 'odometer', 'mmr']
    cat_2_impute = ['body', 'transmission']
    constant_2_impute = ['color', 'interior']
    others = ['year', 'make', 'model', 'trim']
    
    if not preprocessor:
        features_filler = ColumnTransformer([
            ('num_imputer', SimpleImputer(missing_values=np.nan, strategy='mean'), num_2_impute),
            ('cat_imputer', SimpleImputer(missing_values=np.nan, strategy='most_frequent'), cat_2_impute),
            ('cat_constant', SimpleImputer(missing_values=np.nan, strategy='most_frequent'), constant_2_impute),
            ('others', SimpleImputer(missing_values=np.nan, strategy='constant', fill_value='-1'), others )
            ]
        )

        fm = FeaturesModifier(columns = num_2_impute + cat_2_impute + constant_2_impute + others)

        dv = DictVectorizer() 

        preprocessor = Pipeline(steps = [
            ('filler', features_filler),
            ('modifier', fm),
            ('dict_vectorizer', dv)
        ])
        
        X = preprocessor.fit_transform(work_data)

    else:
        X = preprocessor.transform(work_data)

    return X, preprocessor


@task
def params_search(MLFLOW_TRACKING_URI, train, valid, y_train, y_valid, current_date, models):
    
    mlflow.set_tracking_uri(MLFLOW_TRACKING_URI)

    best_models = []

    for baseline in models:
        
        mlflow.set_experiment(f"{baseline.__name__}-models")
        search_space = models[baseline]

        def objective(params):

            with mlflow.start_run():
                mlflow.set_tag("baseline", f"{baseline.__name__}")
                mlflow.log_param("training-data", current_date[:7])
                mlflow.log_param("parameters", params)
                
                print('$$$ Serching for the best parameters... $$$')

                training_model = baseline(**params)
                training_model.fit(train, y_train)

                print('$$$ Predicting on the valid dataset... $$$')
                prediction_valid = training_model.predict(valid)
                rmse_valid = mean_squared_error(y_valid, prediction_valid, squared = False)

                print('$$$ RMSE on valid $$$', 
                    rmse_valid
                    )
                mlflow.log_metric('rmse', rmse_valid)
            

            return {'loss': rmse_valid, 'status': STATUS_OK}
        
        best_result = fmin(fn = objective,
                    space = search_space,
                    algo = tpe.suggest,
                    max_evals = 2, # int(2**(len(models[baseline].items())-2)), #3,
                    trials = Trials(),
                    ) 
        
        print("$$$ Best model $$$\n", baseline(**space_eval(search_space, best_result)))
        best_models.append(baseline(**space_eval(search_space, best_result)))

        mlflow.end_run()
    
    return best_models #ML_model(**space_eval(search_space, best_result))

@task
def train_best_models(MLFLOW_TRACKING_URI, best_models_experiment, train, y_train, X_valid, y_valid, X_test, y_test, preprocessor, models):

    mlflow.set_tracking_uri(MLFLOW_TRACKING_URI)
    mlflow_client = MlflowClient(tracking_uri = MLFLOW_TRACKING_URI)

    best_pipelines = []

    mlflow.autolog()
    for model in models:
    
        experiment = mlflow.set_experiment(f"{model.__name__}-models")

        best_run = mlflow_client.search_runs(
                experiment_ids = experiment.experiment_id,
                run_view_type=ViewType.ACTIVE_ONLY,
                max_results = 2,
                order_by = ['metrics.rmse ASC']
            )
        
        print(f"$$$ Training {model.__name__} with best params $$$")

        mlflow.set_experiment(best_models_experiment) #"Auction-car-prices-best-models")
    
        with mlflow.start_run():
            
            mlflow.log_param("test_dataset", X_test["saledate"].max()[:7])

            best_params = json.loads(best_run[0].data.params['parameters'].replace("'", "\""))
            staged_model = model(**best_params).fit(train, y_train)
            
            pipeline = Pipeline(
                steps = [
                    ('preprocessor', preprocessor),
                    ('model', staged_model)
                ]
            )
            predict_valid = pipeline.predict(X_valid)
            rmse_valid = mean_squared_error(y_valid, predict_valid, squared = False)

            predict_test = pipeline.predict(X_test)
            rmse_test = mean_squared_error(y_test, predict_test, squared = False)

            mlflow.log_metric("rmse_valid", rmse_valid)
            mlflow.log_metric("rmse_test", rmse_test)
            mlflow.sklearn.log_model(pipeline, artifact_path='full-pipeline')
            
            best_pipelines.append((model.__name__, pipeline))

            print("$$$ {:} MODEL was saved as a RUN of {:} $$$".format(model.__name__, best_models_experiment))

            mlflow.end_run()

    return best_pipelines


@task
def model_to_registry(MLFLOW_TRACKING_URI, best_models_experiment, model_name):

    mlflow.set_tracking_uri(MLFLOW_TRACKING_URI)
    mlflow_client = MlflowClient(tracking_uri = MLFLOW_TRACKING_URI)
    
    experiment = mlflow.set_experiment(best_models_experiment) #'Auction-car-prices-best-models')

    best_model_run = mlflow_client.search_runs(
        experiment_ids=experiment.experiment_id,
        run_view_type=ViewType.ACTIVE_ONLY,
        max_results=1,
        order_by=["metrics.rmse_test ASC"]
            
        )
    RUN_ID = best_model_run[0].info.run_id
    model_uri = "runs:/{:}/full-pipeline".format(RUN_ID)
   
    print(f"$$$ Registering model {model_name} $$$")
    registered_model = mlflow.register_model(
            model_uri=model_uri,
            name = model_name
        )
    print(f"$$$ Model RUN_ID {registered_model.run_id} was registered as version {registered_model.version} at {registered_model.current_stage} stage $$$")
    
    return registered_model


@task
def model_promotion(MLFLOW_TRACKING_URI, current_date, model_name, registered_model_version, to_stage):

    mlflow.set_tracking_uri(MLFLOW_TRACKING_URI)
    mlflow_client = MlflowClient(tracking_uri = MLFLOW_TRACKING_URI)

    promoted_model = mlflow_client.transition_model_version_stage(
                                name = model_name,
                                version = registered_model_version,
                                stage = to_stage,
                                archive_existing_versions=True
                                )

    mlflow_client.update_model_version(
        name = model_name,
        version = registered_model_version,
        description=f'The model was promoted to {to_stage} {current_date}'
        )
    print(f"$$$ Model {model_name} version {registered_model_version} was promoted to {to_stage} {current_date} $$$")

    return promoted_model


def load_model(MLFLOW_TRACKING_URI, model_name, stage=None, version=None, run_id=None):
    
    mlflow_client = MlflowClient(MLFLOW_TRACKING_URI)
    mlflow.set_tracking_uri(MLFLOW_TRACKING_URI)

    versions = mlflow_client.get_latest_versions(
                name=model_name,
                stages=[stage]
                )

    try:
        model_uri = f"models:/{model_name}/{versions[0].current_stage}"
        model = mlflow.pyfunc.load_model(model_uri = model_uri)

        return model, versions[0].version
    except:
        print(f"$$$ There are no models at {stage} stage")
        
        return None, None


@task
def switch_model_of_production(MLFLOW_TRACKING_URI, X_test, y_test, model_name, current_date):
    # mlflow.set_tracking_uri("http://127.0.0.1:5000/")

    mlflow.set_tracking_uri(MLFLOW_TRACKING_URI)
    mlflow_client = MlflowClient(MLFLOW_TRACKING_URI)

    staging_model, staging_version = load_model(MLFLOW_TRACKING_URI, model_name, stage = "Staging")
    if staging_model:
        staging_test_prediction = staging_model.predict(X_test)
        rmse_staging = mean_squared_error(staging_test_prediction, y_test, squared=False)
        print(staging_version, rmse_staging)
    else:
        rmse_staging = np.inf

    production_model, production_version = load_model(MLFLOW_TRACKING_URI, model_name, stage = "Production")
    if production_model:
        production_test_prediction = production_model.predict(X_test)
        rmse_production = mean_squared_error(production_test_prediction, y_test, squared=False)
        print(production_version, rmse_production)

    else:
        rmse_production = np.inf

    if rmse_staging <= rmse_production:
        print(f"$$$ Need to switch models. Version {staging_version} is better than {production_version} $$$")
        return staging_version
        
    else:
        print(f"$$$ No need to switch models. Version {production_version} is the best $$$")
        return None


@flow(task_runner = SequentialTaskRunner())
def main(current_date = "2015-5-21", periods = 5):
 
    MLFLOW_TRACKING_URI = 'sqlite:///../mlops-project.db'
    best_models_experiment = "Auction-car-prices-best-models"
    model_name = "Auction-car-prices-prediction"
    
    train_data = load_data(current_date = current_date, periods = periods)
    X, y = na_filter(train_data)

    test_data = load_data(current_date = current_date)
    X_test, y_test = na_filter(test_data)

    X_train, X_valid, y_train, y_valid = train_test_split(X, y, test_size=0.25, random_state=42)

    print("$$$ Training preprocessor... $$$")
    train, preprocessor = prepare_features(X_train, preprocessor = None )
    valid, _  = prepare_features(X_valid, preprocessor)

    print("$$$ Initializing parameters for baseline models... $$$")
    models = {
        LinearRegression: {
            "fit_intercept": hp.choice("fit_intercept", ('True', 'False'))
            },
        Ridge: {"alpha": hp.loguniform("alpha", -5, 5),
                "fit_intercept": hp.choice("fit_intercept", ('True', 'False'))
            },
        # RandomForestRegressor: {
        #         'max_depth': scope.int(hp.quniform('max_depth', 1, 20, 1)),
        #         'n_estimators': scope.int(hp.quniform('n_estimators', 10, 50, 1)),
        #         'min_samples_split': scope.int(hp.quniform('min_samples_split', 2, 10, 1)),
        #         'min_samples_leaf': scope.int(hp.quniform('min_samples_leaf', 1, 4, 1)),
        #         'random_state': 42
        #         },
        # XGBRegressor: {
        #         'max_depth': scope.int(hp.quniform('max_depth', 4, 100, 1)),
        #         'learning_rate': hp.loguniform('learning_rate', -3, 0),
        #         'reg_alpha': hp.loguniform('reg_alpha', -5, -1),
        #         'reg_lambda': hp.loguniform('reg_lambda', -6, -1),
        #         'max_child_weight': hp.loguniform('max_child_weight', -1, 3),
        #         'num_boost_rounds': 100,
        #         # 'early_stopping_rounds': 20,
        #         'objective': 'reg:squarederror',
        #         'seed': 42,
        #         }
        }

    best_models = params_search(
        MLFLOW_TRACKING_URI,
        train, valid,
        y_train, y_valid,
        current_date,
        models)

    # print("$$$ Best params $$$\n", best_models)

    best_pipelines = train_best_models(
        MLFLOW_TRACKING_URI,
        best_models_experiment,
        train, y_train,
        X_valid, y_valid,
        X_test, y_test,
        preprocessor,
        models = models
        )

    registered_model = model_to_registry(MLFLOW_TRACKING_URI, best_models_experiment, model_name)
    # print(registered_model.run_id, registered_model.version)

    model_promotion(
        MLFLOW_TRACKING_URI,
        current_date,
        model_name,
        registered_model_version=registered_model.version,
        to_stage = "Staging"
        )
    
    switch_to_version = switch_model_of_production(MLFLOW_TRACKING_URI, X_test, y_test, model_name, current_date)
    
    if switch_to_version:
        pass
        model_promotion(
            MLFLOW_TRACKING_URI,
            current_date = current_date,
            model_name = model_name,
            registered_model_version = switch_to_version,
            to_stage="Production"
            )
    else:
        print("$$$ Current is OK $$$")


 `@task(name='my_unique_name', ...)`

 `@task(name='my_unique_name', ...)`

 `@task(name='my_unique_name', ...)`

 `@task(name='my_unique_name', ...)`

 `@task(name='my_unique_name', ...)`

 `@task(name='my_unique_name', ...)`

 `@task(name='my_unique_name', ...)`

 `@task(name='my_unique_name', ...)`

 `@flow(name='my_unique_name', ...)`


In [15]:
main()

20:30:54.998 | INFO    | prefect.engine - Created flow run 'strict-husky' for flow 'main'
20:30:55.174 | INFO    | Flow run 'strict-husky' - Created task run 'load_data-2ff00c39-0' for task 'load_data'
20:30:55.175 | INFO    | Flow run 'strict-husky' - Executing 'load_data-2ff00c39-0' immediately...


Cannot find file car-prices-2014-11.csv using blank
Getting TRAIN data for 2014-12 period
Getting TRAIN data for 2015-1 period
Getting TRAIN data for 2015-2 period
Getting TRAIN data for 2015-3 period


20:30:59.300 | INFO    | Task run 'load_data-2ff00c39-0' - Finished in state Completed()
20:30:59.342 | INFO    | Flow run 'strict-husky' - Created task run 'na_filter-b6150c66-0' for task 'na_filter'
20:30:59.343 | INFO    | Flow run 'strict-husky' - Executing 'na_filter-b6150c66-0' immediately...
20:31:01.203 | INFO    | Task run 'na_filter-b6150c66-0' - Finished in state Completed()
20:31:01.249 | INFO    | Flow run 'strict-husky' - Created task run 'load_data-2ff00c39-1' for task 'load_data'
20:31:01.250 | INFO    | Flow run 'strict-husky' - Executing 'load_data-2ff00c39-1' immediately...
20:31:01.436 | INFO    | Task run 'load_data-2ff00c39-1' - Finished in state Completed()
20:31:01.482 | INFO    | Flow run 'strict-husky' - Created task run 'na_filter-b6150c66-1' for task 'na_filter'
20:31:01.484 | INFO    | Flow run 'strict-husky' - Executing 'na_filter-b6150c66-1' immediately...


Getting TEST data for 2015-4 period


20:31:01.583 | INFO    | Task run 'na_filter-b6150c66-1' - Finished in state Completed()
20:31:01.896 | INFO    | Flow run 'strict-husky' - Created task run 'prepare_features-fbf67cd7-0' for task 'prepare_features'
20:31:01.898 | INFO    | Flow run 'strict-husky' - Executing 'prepare_features-fbf67cd7-0' immediately...
2022/08/21 20:31:01 INFO mlflow.utils.autologging_utils: Created MLflow autologging run with ID '01710038080e4a0c9a3fe0f3652e2846', which will track hyperparameters, performance metrics, model artifacts, and lineage information for the current sklearn workflow
                                 ['condition', 'odometer', 'mmr']),
                                ('cat_imputer',
                                 SimpleImputer(strategy='most_frequent'),
                                 ['body', 'transmission']),
                                ('cat_constant',
                                 SimpleImputer(strategy='most_frequent'),
                                 ['color', 'i

$$$ Training preprocessor... $$$


20:31:09.140 | ERROR   | Flow run 'strict-husky' - Encountered exception during execution:
Traceback (most recent call last):
  File "/homw/winx/.conda/envs/mlops-project-env/lib/python3.9/site-packages/prefect/engine.py", line 566, in orchestrate_flow_run
    result = await run_sync(flow_call)
  File "/homw/winx/.conda/envs/mlops-project-env/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 56, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(call, cancellable=True)
  File "/homw/winx/.conda/envs/mlops-project-env/lib/python3.9/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/homw/winx/.conda/envs/mlops-project-env/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/homw/winx/.conda/envs/mlops-project-env/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = cont

OperationalError: (sqlite3.OperationalError) disk I/O error
[SQL: PRAGMA journal_mode = WAL;]
(Background on this error at: https://sqlalche.me/e/14/e3q8)

In [10]:
current_date = "2015-06-21"
model_name = "Auction-car-prices-prediction"

test_data = load_data(current_date = "2015-06-21")
X_test, y_test = na_filter(test_data)
X_test["saledate"].max()[:7]





Getting TEST data for 2015-5 period


RuntimeError: Tasks cannot be run outside of a flow. To call the underlying task function outside of a flow use `task.fn()`.

In [6]:
MLFLOW_TRACKING_URI = 'sqlite:///../mlops-project.db'

# mlflow.end_run()
# mlflow_client.delete_registered_model("Auction-car-prices-prediction")
# mlflow.delete_experiment(experiment_id=5)


# loaded_model = mlflow.pyfunc.load_model(model_uri=model_uri)
# print(loaded_model)
# loaded_model.predict(X_test)

model_name = "Auction-car-prices-prediction"
stage = 'Production'
production_model = mlflow.pyfunc.load_model(model_uri=f'models:/{model_name}/{stage}')

mlflow.pyfunc.loaded_model:
  artifact_path: full-pipeline
  flavor: mlflow.sklearn
  run_id: 9d39355537eb4504954636659de5b1ec



array([ 2713.04427261, 12171.78593638, 13229.49827207, ...,
       26662.07692966, 23770.28814217,  9173.15714517])