# Production MLFlow pipleine

## Import model training functions & library

In [0]:
%run /ADR/codebase/prod/adr_classifier_model_train_functions

In [0]:
%run /ADR/codebase/prod/adr_clustering_model_train_functions

In [0]:
import numpy
from numpy import savetxt
import mlflow
import mlflow.sklearn
import json
import pandas as pd
from pandas import json_normalize
from pyspark.sql import SparkSession
import os
from datetime import datetime

## Set MLFlow Experiment Name

In [0]:
exp_name = '/ADR/MLFlow_experiments/MLFlow_exp_2' #Can point to current notebook
artif_loc = 'dbfs:/FileStore/FileStore/'

if mlflow.get_experiment_by_name(exp_name) is None:
  mlflow.create_experiment(exp_name, artifact_location=artif_loc)
else:
  mlflow.set_experiment(exp_name)

experiment = mlflow.get_experiment_by_name(exp_name)
experiment

## Train Clustering Model

In [0]:
def gen_clusters():
  print("Begin Clustering ....")
  # Read input data.    # Paths read from 'paths' config notebook
  try:
      # Import the file names from config file
      filenames_config = filenames['clustering']  
      # Import the feature columns from config file
      featurecols = datasources['SupplyBOAnalysis']
      # Import the clustering parameters from config file
      cluster_config = model_config['clustering']
      clust_algo = cluster_config['algorithm']
      k = cluster_config['no_clusters']
      pool_dt = cluster_config['pool_data_snapshot']

      # Read selective data columns
      pool_def_file = filenames_config['in']['pool_def_data']
      df = spark.read.format('csv').load(datapath + '/in/raw/pool/' + pool_def_file, header='true')
      raw_data = faster_toPandas(df)

      # Check if all feature cols are present in the input data
      featurecols = list(featurecols)
      for featurecol in featurecols:
          if featurecol not in list(raw_data.columns):
              print(featurecol, " missing from input data")

      # Consider records which have 'POOL TYPE' = 'RP' only
      data = raw_data.loc[
          (raw_data['POOLTYPE'] == 'RP') & ~(raw_data['POOL_NAME'].str.startswith('To be deleted'))].copy()

      # Select only Important Features
      datain = data[featurecols]

      # Impute missing values
      data_clean = dataimputer(datain, pool_dt)
      # List of input categorical columns which need to be encoded
      feat_cols_to_encode = featurecols.copy()
      feat_cols_to_encode.remove('POOL_NAME')

      # Build features df required for clustering
      pool_feat_data_df, cluster_input = build_features(data_clean, feat_cols_to_encode, pool_dt)

      # Cluster
      if clust_algo == 'kmeans':
          cluster_obj = kmeans(cluster_input, k, pool_dt)
          ctype = 'km'
      elif clust_algo == 'agglomerative':
          cluster_obj = agglomerative(cluster_input, k, pool_dt)
          ctype = 'ahc'

      # Save cluster objects
      pool_pd_df, pool_dict_pd_df = save_cluster(cluster_obj, data, pool_feat_data_df, ctype, pool_dt) 
      pool_df = convert_to_spark(pool_pd_df)
      pool_dict_df = convert_to_spark(pool_dict_pd_df)
      pool_dict_df = pool_dict_df.withColumn("Cluster", pool_dict_df["Cluster"].cast(StringType()))
      pool_dict_df = pool_dict_df.withColumnRenamed("Cluster", "POOL_CLUSTER")
  #         logger.info("Finished Clustering")
      print("Finished Clustering")
  #         logger.info(
  #             "END -------------------------------------------------------------------------------------------------")
      print("END Clusering")
      return pool_df, pool_dict_df, cluster_obj, k
  except KeyError as keyerr:
  #         logger.error("Error while reading config file", exc_info=True)
      print("Error while reading config file")
      raise keyerr
  except FileNotFoundError as filerr:
  #         logger.error("Missing input file", exc_info=True)
      print("Missing input file")
      raise filerr
  except Exception as ex:
  #         logger.error("Exception occured", exc_info=True)
      raise ex

##Train Classifier Model

In [0]:
today = datetime.now()
timestr = today.strftime("%Y_%m_%d_%H%M%S")
def train_adr_classifier(raw_data):
  # Generate pool clusters required for classifier 
  pool_df, pool_dict_df,cluster_obj, k = gen_clusters()
  # Preprocess the demand data
  print("Preprocess demand data")
  raw_data_ack_bfr_col_df = preprocessing(raw_data,pool_dict_df)
  model_ohe_col, train_test_df, valid_df = filter_data(raw_data_ack_bfr_col_df, basic_config, features)
  ranked_jr_comp_df, ranked_comp_jr_df, train_test_imputed_df = impute_missing_values(train_test_df)
  
  # Feature Engineering
  final_numerical_df = numerical_features(train_test_imputed_df)
  final_numerical_spark_df = convert_to_spark(final_numerical_df)  
  train_test_imputed_pd_df = faster_toPandas(train_test_imputed_df, 100)
  enc, one_hot_col, cat_labels, clean_model_ohe_final_df = pd_ohe_features(train_test_imputed_pd_df, model_ohe_col)
  
  clean_model_ohe_final_spark_df = convert_to_spark(clean_model_ohe_final_df)
  ohe_text_num_merge_df = merge_df(train_test_imputed_df, clean_model_ohe_final_spark_df, final_numerical_spark_df)
  ohe_text_num_merge_df.cache()
  ohe_text_num_merge_pandas_df = spark_to_pandas(ohe_text_num_merge_df)
  X_train, X_test, y_train, y_test = data_split(ohe_text_num_merge_pandas_df)

  # Scaling the numerical features
  scaler, X_train_num = minmaxscalar(X_train)
  X_test_num = X_test.copy()
  X_test_num[['WE_DURATION']] = scaler.transform(X_test[['WE_DURATION']])

  vectorizer_object_apr_title, nmf_object_apr_title, vectorizer_object_assign_title, nmf_assign_title, X_train_final, X_test_final, y_train_final, y_test_final = handle_text_cols(X_train_num, X_test_num, y_train, y_test)
  X_train_final.shape, y_train_final.shape, X_test_final.shape, y_test_final.shape
  y_train_final['POOL_CLUSTER'] =  y_train_final['POOL_CLUSTER'].astype('str')
  y_test_final['POOL_CLUSTER'] =  y_test_final['POOL_CLUSTER'].astype('str')

  ## WRITE Artifacts to DBFS BEFORE logging to MLFLOW
  # Writing CSV on DBFS
  ranked_jr_comp_pd_df = spark_to_pandas(ranked_jr_comp_df)
  ranked_comp_jr_pd_df = spark_to_pandas(ranked_comp_jr_df)
  #ranked_jr_comp_pd_df.to_csv("Impute_Jobrole_with_Comp_Mode.csv",index=False)  
  #ranked_comp_jr_pd_df.to_csv("Impute_Comp_with_Jobrole_Mode.csv",index=False)
  
  pool_pd_df = spark_to_pandas(pool_df)
  pool_dict_pd_df = spark_to_pandas(pool_dict_df)
#   pool_pd_df.to_csv("pool_df.csv",index=False)
#   pool_dict_pd_df.to_csv("pool_clust_dict_df.csv",index=False)
    
  """
  ## Start an MLflow run; the "with" keyword ensures we'll close the run even if this cell crashes
  """
  print("Begin MLFlow train run")
  with mlflow.start_run() as mlflow_run:
    model_name = model_config['classifier']['ml']['algorithm']
    n_estimators = model_config['classifier']['ml'][model_name]['n_estimators']
    criterion = model_config['classifier']['ml'][model_name]['criterion']
    random_state = model_config['classifier']['ml'][model_name]['random_state']
    min_samples_leaf = model_config['classifier']['ml'][model_name]['min_samples_leaf']
    min_samples_split = model_config['classifier']['ml'][model_name]['min_samples_split']
    classifier, test_acc, train_acc, y_pred, y_pred_train = rf_model_function(X_train_final.iloc[:,1:], X_test_final.iloc[:,1:], y_train_final, y_test_final, n_estimators, criterion, random_state, min_samples_leaf, min_samples_split) 
    train_acc = np.round(100*train_acc,2)
    test_acc  = np.round(100*test_acc,2)
    print("Train Acc" + "-->" ,train_acc)
    print("Test Acc" + "-->" , test_acc)
    
    # Predict on Validation set
#     print("Perform model validation")
#     valid_imputed_df = valid_impute_missing_values(ranked_jr_comp_df, ranked_comp_jr_df, valid_df)
#     valid_imputed_pd_df = faster_toPandas(valid_imputed_df, 100)
#     valid_clean_model_ohe_df = valid_ohe_features(valid_imputed_pd_df, enc, model_ohe_col)
#     valid_clean_model_ohe_spark_df = convert_to_spark(valid_clean_model_ohe_df)
#     valid_final_numerical_df = valid_numerical_features(scaler, valid_imputed_df)
#     valid_final_numerical_spark_df = convert_to_spark(valid_final_numerical_df)
#     valid_ohe_text_num_merge_df = merge_df(valid_imputed_df, valid_clean_model_ohe_spark_df, valid_final_numerical_spark_df)
#     valid_ohe_text_num_merge_pandas_df = spark_to_pandas(valid_ohe_text_num_merge_df)
#     apr_pool_df = valid_imputed_pd_df[['APR_ID', 'POOL_CLUSTER']].drop_duplicates().reset_index(drop = True)
#     valid_ohe_text_num_merge_pandas_df = pd.merge(valid_ohe_text_num_merge_pandas_df, apr_pool_df, on = 'APR_ID', how = 'inner', validate = '1:1')

#     X = valid_ohe_text_num_merge_pandas_df.loc[:, valid_ohe_text_num_merge_pandas_df.columns != 'POOL_CLUSTER']
#     y = valid_ohe_text_num_merge_pandas_df['POOL_CLUSTER']
#     X_final = free_text_test_transform(vectorizer_object_apr_title, nmf_object_apr_title, vectorizer_object_assign_title, nmf_assign_title, X)
#     y_final = pd.DataFrame(y).reset_index(drop = True)
#     y_final = y_final.astype('str')
#     y_pred_valid = pd.DataFrame(classifier.predict(X_final.iloc[:,1:]), columns=['Model_Prediction'])
#     valid_acc = accuracy_score(y_final,y_pred_valid)
#     valid_acc = np.round(100*valid_acc,2)
#     print("Validation Acc" + "-->" , valid_acc)
    
    print("Logging to MLFlow")
    # Log mlflow attributes for mlflow UI
    mlflow.log_param("no_clusters",k)
    mlflow.log_params({"train_rows":X_train.shape[0],
                       "train_cols":X_train.shape[1],
                      })
    model_params = {"n_estimators":n_estimators,
                   "criterion":criterion,
                   "min_samples_leaf":min_samples_leaf,
                   "min_samples_split":min_samples_split
                   }
    mlflow.log_params(model_params)
    
    mlflow.log_metric("train_accuracy", train_acc)
    mlflow.log_metric("test_accuracy", test_acc)
#     mlflow.log_metric("valid_accuracy", valid_acc)
        
    mlflow.sklearn.log_model(classifier, "classifier-model")
    #mlflow.log_artifact("Impute_Jobrole_with_Comp_Mode.csv")
    #mlflow.log_artifact("Impute_Comp_with_Jobrole_Mode.csv")
#     mlflow.log_artifact("pool_df.csv")
#     mlflow.log_artifact("pool_clust_dict_df.csv")
    
    mlflow.sklearn.log_model(ranked_jr_comp_pd_df, "Impute_Jobrole_with_Comp_Mode")
    mlflow.sklearn.log_model(ranked_comp_jr_pd_df, "Impute_Comp_with_Jobrole_Mode")
    mlflow.sklearn.log_model(pool_pd_df, "pool_df")
    mlflow.sklearn.log_model(pool_dict_pd_df, "pool_dict_pd_df")
        
    mlflow.sklearn.log_model(cluster_obj, "clustering")
    mlflow.sklearn.log_model(scaler, "minmax")
    mlflow.sklearn.log_model(enc, "OneHotEncoder")
    mlflow.sklearn.log_model(vectorizer_object_apr_title, "vectorizer_object_apr_title")
    mlflow.sklearn.log_model(vectorizer_object_assign_title, "vectorizer_object_assign_title")
    mlflow.sklearn.log_model(nmf_object_apr_title, "nmf_object_apr_title")
    mlflow.sklearn.log_model(nmf_assign_title, "nmf_assign_title")
    run_id = mlflow_run.info.run_id
  return run_id

In [0]:
raw_data = read_data()

In [0]:
run_id = train_adr_classifier(raw_data)
run_id

## Pre & Post Processing function for production model

In [0]:
# Define the model class
class RF_with_preprocess(mlflow.pyfunc.PythonModel):
    def __init__(self, trained_rf):
        self.rf = trained_rf
    
    def explode(self, df, we_df, lst_cols,):    
        exp_lst = []
        exp_df = df.copy()
        for col in lst_cols:
            exp_col = exp_df[[col]].explode(col)
            exp_col = exp_col.set_index(exp_col.groupby(level=0).cumcount(), append=True)
            exp_lst.append(exp_col)

        df = pd.concat(exp_lst, axis=1).reset_index(drop=True, level=1)
        df.reset_index(drop=True, inplace=True)
        df = pd.concat([df, we_df], axis=1)
        df = df.ffill()
        return df
    
    def preprocess_input(self, input_data, artifact_path):        
        # Return pre-processed model_input
        print("DEBUG")
        print(input_data)
        print("Length :{0}, Type :{1}".format(len(input_data),type(input_data)))
        print("DEBUG_END")
        prod_raw_df = pd.DataFrame(input_data)
#         request_data = input_data['data']
#         request_cols = input_data['columns']
#         prod_raw_df = pd.DataFrame(data = request_data, columns = request_cols)
#         we_df = pd.DataFrame.from_dict(json_normalize(model_input['WORKEFFORTS']), orient='columns')
#         we_df.columns = ['WE_STARTDATE', 'WE_ENDDATE', 'WE_DURATION', 'WE_TYPE', 'WE_FTE_PERCENT', 'WORK_EFFORT']
#         prod_df = pd.DataFrame.from_dict(json_normalize(model_input), orient='columns')
#         del(prod_df['WORKEFFORTS'])
#         prod_raw_df = self.explode(prod_df, we_df, input_features)
#         model_input = json.dumps(input_data)
#         prod_raw_df = pd.read_json(model_input, orient='split')
#         spark = SparkSession.builder.getOrCreate()
#         prod_spark_df = spark.createDataFrame(prod_raw_df) 

        
#         1. Import encoders 
#         2. Impute & feature engineer
#         3. Return the feature data
        
        #artifact_path = artif_loc + run_id + '/artifacts/' + 'classifier_preproc'
                
#         outfile_jr_comp_mode = artifact_path + "Impute_Jobrole_with_Comp_Mode.csv"
#         outfile_comp_jr_mode = artifact_path + "Impute_Comp_with_Jobrole_Mode.csv"
# #         ranked_jr_comp_df = spark.createDataFrame(pd.read_csv(outfile_jr_comp_mode.replace("dbfs:", "/dbfs")))
# #         ranked_comp_jr_df = spark.createDataFrame(pd.read_csv(outfile_comp_jr_mode.replace("dbfs:", "/dbfs")))
#         ranked_jr_comp_pd_df = pd.read_csv(outfile_jr_comp_mode.replace("dbfs:", "/dbfs"))
#         ranked_comp_jr_pd_df = pd.read_csv(outfile_comp_jr_mode.replace("dbfs:", "/dbfs"))
                 
        ranked_jr_comp_pd_df = mlflow.sklearn.load_model(artifact_path + "/Impute_Jobrole_with_Comp_Mode")
        ranked_comp_jr_pd_df = mlflow.sklearn.load_model(artifact_path + "/Impute_Comp_with_Jobrole_Mode")
        
        scaler = mlflow.sklearn.load_model(artifact_path + "/minmax")
        enc = mlflow.sklearn.load_model(artifact_path + "/OneHotEncoder")
        

        nmf_assign_title = mlflow.sklearn.load_model(artifact_path + "/nmf_assign_title")
        nmf_object_apr_title = mlflow.sklearn.load_model(artifact_path + "/nmf_object_apr_title")
        vectorizer_object_apr_title = mlflow.sklearn.load_model(artifact_path + "/vectorizer_object_apr_title")
        vectorizer_object_assign_title = mlflow.sklearn.load_model(artifact_path + "/vectorizer_object_assign_title")     
        clean_prod_test_demand_pd_df = prod_impute_missing_values(ranked_jr_comp_pd_df, ranked_comp_jr_pd_df, prod_raw_df)
        model_ohe_col = features['model_ohe_var']
        #clean_prod_test_demand_pd_df = faster_toPandas(clean_prod_test_demand_df, 100)
        prod_clean_model_ohe_df = valid_ohe_features(clean_prod_test_demand_pd_df, enc, model_ohe_col)
        #prod_clean_model_ohe_spark_df = spark.createDataFrame(prod_clean_model_ohe_df)
        prod_final_numerical_df = prod_numerical_features(scaler, clean_prod_test_demand_pd_df)
        #prod_final_numerical_spark_df = spark.createDataFrame(prod_final_numerical_df)      
        prod_ohe_text_num_merge_df = pd_merge_df(clean_prod_test_demand_pd_df, prod_clean_model_ohe_df, prod_final_numerical_df)
        #prod_ohe_text_num_merge_pd_df = prod_ohe_text_num_merge_df.toPandas()
        X = prod_ohe_text_num_merge_df
                
        X_final = free_text_test_transform(vectorizer_object_apr_title, nmf_object_apr_title, vectorizer_object_assign_title, nmf_assign_title, X)
        model_input = X_final.copy()        
        return model_input, prod_raw_df

    def post_process_result(self, model_input, model_result,prod_raw_df, artifact_path):
      """
      Input:  1. prod_raw_df - Input prod Demand data with no missing values
              2. model_input - Prod Independent variables after all the feature engineering is completed
              3. model_result - Predicted Dependent Variable by classifier (pool_cluster value)
              4. pool_df, pool_clust_dict_df - Select only records with POOL_TYPE = RP, and Pool Cluster file          
      Output: 1. prod_rec_df - Save production input record & the generated predictions for analysis
              2. recommended_pools - Top 5 recommended pools 
      Purpose: Creating and saving the model performance report for monitoring the results on the train data
      """
      try:
          ## Read pool_df, pool_clust_dict_df from the MLFlow artifacts logged during clustering
#           artifact_path = artif_loc + run_id + '/artifacts/' 
#           outfile_pool_df = artifact_path + "pool_df.csv"
#           outfile_pool_clust_dict_df = artifact_path + "pool_clust_dict_df.csv"
#           pool_df = pd.read_csv(outfile_pool_df.replace("dbfs:", "/dbfs"))
#           pool_clust_dict_df = pd.read_csv(outfile_pool_clust_dict_df.replace("dbfs:", "/dbfs"))

#           logged_model_pool_df = artifact_path + '/pool_df'
          pool_df = mlflow.sklearn.load_model(artifact_path + '/pool_df')
#           logged_model_pool_clust_dict_df = artifact_path + '/pool_clust_dict_df'
          pool_clust_dict_df = mlflow.sklearn.load_model(artifact_path + '/pool_dict_pd_df')
          
          ## Input prod record
          prod_rec_df = pd.concat([model_input, model_result], axis = 1)[['APR_ID', 'Model_Prediction']]
          prod_rec_df['Model_Prediction'] =  prod_rec_df['Model_Prediction'].astype('str')
          apr_id = prod_rec_df['APR_ID'][0]
          
          # List because a demand can have multiple job roles
          demand_prod_jr = list(prod_raw_df.loc[prod_raw_df['APR_ID'] == apr_id]['APR_JOBROLE_NAME'].unique())
          pool_jr = list(pool_df['JOBROLENAME'].unique())
          cluster = list(prod_rec_df.loc[prod_rec_df['APR_ID'] == apr_id]['Model_Prediction'])[0]
          clust = int(re.findall("\\d+", cluster)[0])

          # Data Stucture for Cluster Filter 
          jr_clust_rank_dict, jr_clust_rank_df = genclustrank(pool_df,10)

          for JR in demand_prod_jr[:2]:
            if JR in pool_jr:
              JR_final =JR
              break
            else:
              JR_final = demand_prod_jr[0]
              
          if JR_final in pool_jr:
            # Recommend final cluster to the input APR
            if JR_final in jr_clust_rank_dict.keys():
                if clust in jr_clust_rank_dict[JR_final]['top']:
                    rec_clust = clust
                else:
                    rec_clust = jr_clust_rank_dict[JR_final]['top'][0]
            else:
                rec_clust = clust
            # Recommend final pools to the input APR            
            recommended_pools = recpools(pool_df,pool_clust_dict_df,str(rec_clust),JR_final,5)
          else:              
              rec_clust = clust
              recommended_pools = list(pool_clust_dict_df.loc[pool_clust_dict_df['POOL_CLUSTER'] == str(rec_clust)]['POOL_NAME'][:5])

          # Save production input record & the generated predictions for analysis
          # TODO: Check with DAP team to build a table for saving the data.
          prod_rec_df.loc[prod_rec_df['APR_ID']==apr_id,'Job Role'] = str(JR_final)
          prod_rec_df.loc[prod_rec_df['APR_ID']==apr_id,'Recommended Cluster'] = str(rec_clust)
          prod_rec_df.loc[prod_rec_df['APR_ID']==apr_id,'Recommended Pools'] = str(recommended_pools)
        
          # Creating the POOL ID - POOL NAME dictionary (1:1 Mapping)
          cluster_dict_df = pool_df.loc[pool_df['POOLTYPE'] == 'RP'][['POOL_ID', 'POOL_NAME']].drop_duplicates().reset_index(drop = True)
          cluster_dict_df.set_index('POOL_ID',inplace=True)
          cluster_dict = cluster_dict_df.to_dict()['POOL_NAME']
          cluster_dict_rev = {v : k for k, v in cluster_dict.items()}

          rec_pool_id = [cluster_dict_rev.get(item,item)  for item in recommended_pools]

          result_pool_dict = {"SUGGESTED_POOLS": rec_pool_id}
          return result_pool_dict
      except Exception as err:
#           self.logger.error("<----- Prod Prediction: Not able to apply rule based filtering on the prod data ----->", exc_info=True)
          raise err  

    def predict(self, context,model_input):
#       artifact_path = artif_loc + run_id + '/artifacts/' + 'classifier-preproc'
      artifact_path = '/opt/mlflow/model/'
      processed_model_input, prod_raw_df = self.preprocess_input(model_input, artifact_path)      
      model_result = pd.DataFrame(self.rf.predict(processed_model_input.iloc[:,1:]), columns=['Model_Prediction'])
      post_process_result = self.post_process_result(processed_model_input,model_result, prod_raw_df, artifact_path)
      return post_process_result

In [0]:
conda_env_dict = {
   "channels": [
      "defaults",
      "conda-forge"
   ],
   "dependencies": [
      "python=3.7.6",
      "pip",
      {
         "pip": [
            "mlflow==1.11.0",
            "cloudpickle==1.3.0",
            "pyspark==3.0.1",
			"scikit-learn==0.22.1",
			"pandas==1.0.1",
			"numpy==1.18.1",			
			"matplotlib==3.1.3"
         ]
      }
   ],
   "name": "mlflow-env"
}

In [0]:
# Construct and save final pyfunc model for production deployment
model_path_preproc = artif_loc + run_id + '/artifacts/' + 'classifier-preproc'
dbutils.fs.rm(model_path_preproc, True) # remove folder if already exists

# Read the classifier model logged during training
artifact_path = artif_loc + run_id + '/artifacts/'

logged_model = artifact_path + 'classifier-model'
loaded_model = mlflow.sklearn.load_model(logged_model)

logged_model_ranked_jr_comp = artifact_path + 'Impute_Jobrole_with_Comp_Mode'
loaded_model_ranked_jr_comp = mlflow.sklearn.load_model(logged_model_ranked_jr_comp)
logged_model_ranked_comp_jr = artifact_path + 'Impute_Comp_with_Jobrole_Mode'
loaded_model_ranked_comp_jr = mlflow.sklearn.load_model(logged_model_ranked_comp_jr)
#logged_model_ranked_comp_jr = artifact_path + 'pool_df'
loaded_model_pool_df = mlflow.sklearn.load_model(artifact_path + 'pool_df')
#logged_model_ranked_comp_jr = artifact_path + 'pool_clust_dict_df'
loaded_model_pool_dict_df = mlflow.sklearn.load_model(artifact_path + 'pool_dict_pd_df')

logged_model_minmax = artifact_path + 'minmax'
loaded_model_minmax = mlflow.sklearn.load_model(logged_model_minmax)
logged_model_ohe = artifact_path + 'OneHotEncoder'
loaded_model_ohe = mlflow.sklearn.load_model(logged_model_ohe)
logged_model_nmf_assign_title = artifact_path + 'nmf_assign_title'
loaded_model_nmf_assign_title = mlflow.sklearn.load_model(logged_model_nmf_assign_title)
logged_model_nmf_object_apr_title = artifact_path + 'nmf_object_apr_title'
loaded_model_nmf_object_apr_title = mlflow.sklearn.load_model(logged_model_nmf_object_apr_title)
logged_model_vectorizer_object_apr_title = artifact_path + 'vectorizer_object_apr_title'
loaded_model_vectorizer_object_apr_title = mlflow.sklearn.load_model(logged_model_vectorizer_object_apr_title)
logged_model_vectorizer_object_assign_title = artifact_path + 'vectorizer_object_assign_title'
loaded_model_vectorizer_object_assign_title = mlflow.sklearn.load_model(logged_model_vectorizer_object_assign_title)


rf_preprocess_model = RF_with_preprocess(trained_rf = loaded_model)
mlflow.pyfunc.save_model(path=model_path_preproc.replace("dbfs:", "/dbfs"), python_model=rf_preprocess_model,
                        conda_env = conda_env_dict)

ranked_jr_comp_path = (model_path_preproc + "/Impute_Jobrole_with_Comp_Mode").replace("dbfs:", "/dbfs")
mlflow.sklearn.save_model(loaded_model_ranked_jr_comp, ranked_jr_comp_path)
ranked_comp_jr_path = (model_path_preproc + "/Impute_Comp_with_Jobrole_Mode").replace("dbfs:", "/dbfs")
mlflow.sklearn.save_model(loaded_model_ranked_comp_jr, ranked_comp_jr_path)
pool_df_path = (model_path_preproc + "/pool_df").replace("dbfs:", "/dbfs")
mlflow.sklearn.save_model(loaded_model_pool_df, pool_df_path)
pool_clust_dict_df_path = (model_path_preproc + "/pool_dict_pd_df").replace("dbfs:", "/dbfs")
mlflow.sklearn.save_model(loaded_model_pool_dict_df, pool_clust_dict_df_path)

minmax_path = (model_path_preproc + "/minmax").replace("dbfs:", "/dbfs")
mlflow.sklearn.save_model(loaded_model_minmax, minmax_path)
ohe_path = (model_path_preproc + "/OneHotEncoder").replace("dbfs:", "/dbfs")
mlflow.sklearn.save_model(loaded_model_ohe, ohe_path)
nmf_assign_title_path = (model_path_preproc + "/nmf_assign_title").replace("dbfs:", "/dbfs")
mlflow.sklearn.save_model(loaded_model_nmf_assign_title, nmf_assign_title_path)
nmf_object_apr_title_path = (model_path_preproc + "/nmf_object_apr_title").replace("dbfs:", "/dbfs")
mlflow.sklearn.save_model(loaded_model_nmf_object_apr_title, nmf_object_apr_title_path)
vectorizer_object_apr_title_path = (model_path_preproc + "/vectorizer_object_apr_title").replace("dbfs:", "/dbfs")
mlflow.sklearn.save_model(loaded_model_vectorizer_object_apr_title, vectorizer_object_apr_title_path)
vectorizer_object_assign_title_path = (model_path_preproc + "/vectorizer_object_assign_title").replace("dbfs:", "/dbfs")
mlflow.sklearn.save_model(loaded_model_vectorizer_object_assign_title, vectorizer_object_assign_title_path)

logged_model_path = (model_path_preproc + "/classifier-model").replace("dbfs:", "/dbfs")
mlflow.sklearn.save_model(loaded_model, logged_model_path)

## Register Model to production registry

In [0]:
%run /DAPLibraries/DAPMLFlowClient

In [0]:
# Replace usecase-name with name of the usecase provided by DAP Team
dap_mlflow_client = DAPMLFlowClient() 
print("Registering model version for run with run id:", run_id)
dap_mlflow_client.create_model_version("ADR Pool Recommender", run_id, "classifier-preproc")

In [0]:
# dap_mlflow_client.client.transition_model_version_stage("ADR Pool Recommender",14,stage='Production')

# Test the MLFlow pyfunc model

In [0]:
# # Sample input data from API request
# input_data ={
# "APR_ID":"APR-298480",
#   "APR_JOBROLE_NAME":["xyz","ADM Delivery Manager", "Data Scientist","Analyst"],
#   "COMP_DOMAIN":["Business Support System (BSS)", "IP & Transport", "abc"],
#   "ASSIGNMENTSERVICEFLOW":["DGS", "ADM", "def"],
#   "ASSIGNMENTTYPEDESC":"Internal Assignment",
#   "APR_DELIVERY_TYPE":"RBD",
#   "APR_TITLE":"CS2BEAM",
#   "PROJECT_NAME":"SOMIG CS2BEAM",
#   "ONSITE_DEL_LOCATION":["India", "Iran"],
#   "CUSTOMER_UNIT_NAME":["Airtel", "TMobile"],
#   "RESOURCE_SOURCE_SF":["Build", "ADM"],
#   "RESOURCE_SOURCE_TYPE":["Global", "MELA"],
#   "WORKEFFORTS": [
#    { "STARTDATE": "2020-07-24", " ENDDATE": "2020-08-31"," DURATION": "27", " TYPE": " Percent"," FTE_PERCENT": "100", " EFFORT": "246.4" },
#    { "STARTDATE": "2020-07-20", " ENDDATE": "2020-07-24"," DURATION": "50", " TYPE": " Percent"," FTE_PERCENT": "100", " EFFORT": "216.0" }]
# }

In [0]:
# input_data2 = {"columns":["ASSIGNMENTSERVICEFLOW","APR_TITLE","PROJECT_NAME","ONSITE_DEL_LOCATION","RESOURCE_SOURCE_SF","RESOURCE_SOURCE_TYPE","ASSIGNMENTTYPEDESC","APR_DELIVERY_TYPE","CUSTOMER_UNIT_NAME","APR_ID","COMP_DOMAIN","APR_JOBROLE_NAME","WE_STARTDATE","WE_ENDDATE","WE_DURATION","WE_TYPE","WE_FTE_PERCENT","WORK_EFFORT"],"data":[["DGS","CS2BEAM","SOMIG CS2BEAM","India","Build","Global","Internal Assignment","RBD","TMobile","APR-298480","Business Support System (BSS)","ADM Delivery Manager","2020-07-24","2020-08-31",27," Percent",100,246.4],["ADM","CS2BEAM","SOMIG CS2BEAM","Iran","ADM","MELA","Internal Assignment","RBD","GCU MTN & CU MTN Africa","APR-298480","IP & Transport","Data Scientist","2020-07-20","2020-07-24",50," Percent",100,216.0]]}

In [0]:
# # Load the model in `python_function` format
# loaded_preprocess_model = mlflow.pyfunc.load_model(model_path_preproc)
# loaded_preprocess_model.predict(input_data2)

In [0]:
# # Load the model in `python_function` format
# #artifact_path = artif_loc + run_id + '/artifacts/' + 'classifier-preproc'
# loaded_preprocess_model = mlflow.pyfunc.load_model(model_path_preproc)
# loaded_preprocess_model.predict(input_data2)