In [0]:
import numpy as np
import pandas as pd
import csv
from sklearn.model_selection import train_test_split
from pyspark.sql.functions import *


### installing the requirements

In [2]:
!pip install xgboost    #xgboost==1.7.4

Collecting xgboost
  Downloading xgboost-1.7.5-py3-none-manylinux2014_x86_64.whl (200.3 MB)
[2K     [38;2;114;156;31m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m200.3/200.3 MB[0m [31m1.4 MB/s[0m eta [36m0:00:00[0mm eta [36m0:00:01[0m[36m0:00:03[0m
Installing collected packages: xgboost
Successfully installed xgboost-1.7.5
You should consider upgrading via the '/home/goli/anaconda3/bin/python3 -m pip install --upgrade pip' command.[0m[33m
[0m

In [11]:
#!pip install mlflow 

## Register a model via MLflow

In [5]:
import numpy as np
from xgboost import XGBRegressor
import mlflow
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_absolute_error, mean_absolute_percentage_error, mean_squared_error
import math
from sklearn.metrics import r2_score


def RRMSE(true, pred):

    num = np.sum(np.square(true - pred))
    den = np.sum(np.square(pred))
    
    squared_error = num/den
    rrmse_loss = np.sqrt(squared_error)
    
    return rrmse_loss


def train_register_model(df=None, model_name=None):


     with mlflow.start_run(run_name=model_name):
        
    
        params={'learning_rate':0.1,
            'max_depth':8,
            'n_estimators':1000}     
      
        mlflow.log_params(params) # manual log
      
        mlflow.autolog() # automatic log

        xg_model = XGBRegressor(**params,
                                 min_child_weight=0,
                                 gamma=0.7,
                                 subsample=0.7,
                                 colsample_bytree=0.7,
                                 objective='reg:squarederror', #'reg:linear',
                                 nthread=-1,
                                 scale_pos_weight=1,
                                 seed=25,
                                 reg_alpha=0.00006,
                                 random_state=42)
        
        
        X_train ,y_train= prepare_Xy(df)
        
        xg_model.fit(X_train , y_train)

        print('trining finished ...')


        mlflow.xgboost.log_model(xg_model, 
                         artifact_path="XGBoost_models/%s"%model_name, 
                          registered_model_name="%s"%model_name)
        
        print('model %s has been registered '%model_name, '\n')
        
        

In [6]:

def prepare_Xy(df):
  
    y=df.pop('target')

    X=df
  

    return X,y


## Get the lastest version of the model a

In [0]:
from collections import defaultdict
import numpy as np
import time
from mlflow.tracking.client import MlflowClient
from mlflow.entities.model_registry.model_version_status import ModelVersionStatus
import mlflow
from collections import defaultdict
import numpy as np

def fetch_model_last_update(model_name):


    client = MlflowClient() 

    vf= client.get_latest_versions(model_name, stages=["None"])
    
    lastest_model= vf[0].version

    run_id= vf[0].run_id
    run = mlflow.get_run(run_id)
    run.data.tags['mlflow.log-model.history'] 
    
    DT_split= DT.split('":"')[-3][:10]
    REG_DATE=DT_split.split('-')

    print('Model:',model_name, '  last training date:', REG_DATE[0]+'-'+REG_DATE[1]+'-'+REG_DATE[2])
     
    model_last_update= date(int(REG_DATE[0]), int(REG_DATE[1]), int(REG_DATE[2]))

    return model_last_update



In [10]:
from mlflow import MlflowClient
from datetime import date, datetime
import warnings
warnings.simplefilter(action='ignore')
import time, threading



def test_prediction( df_test, model_name ):


    client = MlflowClient() 

    vf= client.get_latest_versions(group, stages=["None"])
  
    model_test = mlflow.pyfunc.load_model(model_uri="models:/%s/%s"% (model_name, vf[0].version)) 

    
    X_test, y_test= prepare_Xy(df_test)
                      

    y_pred= np.array(model_test.predict(X_test))

    rrmse= RRMSE(y_test, y_pred)
    r2= r2_score( y_test, y_pred)

    return rrmse, r2

In [9]:
import mlflow
import numpy as np
from xgboost import XGBRegressor
import mlflow
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_absolute_error, mean_absolute_percentage_error, mean_squared_error
import math
from sklearn.metrics import r2_score
from mlflow import MlflowClient


def model_exist(model_name):

  client = MlflowClient() 

  try:
      print('existence check ...')
      client.get_latest_versions(odel_name, stages=["None"])

  except:
       print(odel_name, "***** Model does not exist ****** ", '\n')

       return False

  return True



def update_data():
     %run ./Extract_data_and_merging_
     %run ./preprocessing_feature_engineering_

    #dbutils.notebook.run("Extract_data_and_merging_", 60)
    #butils.notebook.run("preprocessing_feature_engineering_", 60)


def fetch_new_data():

  old_data= spark.read.options(header='True', inferSchema='True', delimiter=',').csv("/dbfs/mnt/data")
                                                                                    
  print('old data has been loaded ...')

  update_data()

  updated_data= spark.read.options(header='True', inferSchema='True', delimiter=',').csv("/dbfs/mnt/data")
  updated_dataset= updated_data.toPandas()

  print('updated data has been loaded  ...')


  if not updated_dataset.equals(old_dataset):

      new_records= pd.concat([updated_data, old_data]).drop_duplicates(keep=False)

      print('new data records ready ...')

      return new_records, updated_dataset  

  else:
      return   old_dataset, updated_dataset  



def model_drift_detector( model_list):

    new_records, updated_dataset =fetch_new_data(df)
 

    for model_name in model_list:

        if model_exist(model_name):

            test_rrmse, test_r2= test_prediction(new_records, model_name)

            print('Test RRMSE:', test_rrmse, '   R2:', test_r2)


            if test_rrmse >0.2 and test_r2 <0.9: 

              print('Drift detected: model should be retrained...')

              train_register_model(updated_dataset, model_name)

            else:
              print('No drift detected ...', '\n')

        else:
            continue 
      
     


def monitor_model():

    while True:
    
       model_drift_detector(model_list)
       time.sleep( 2629800 ) # monthly check

#monitor_model() 


