In [0]:
pip install azureml-mlflow

In [0]:
import mlflow

In [0]:
from pyspark.sql.types import StructType,StructField, StringType, IntegerType,DoubleType
# File location and type
file_location = "/FileStore/tables/sample_v1.csv"

file_type = "csv"
# CSV options
first_row_is_header = "true"
delimiter = ","
'''
Reading sample enqs
'''
schema = StructType([ \
    StructField("id",IntegerType(),True), \
    StructField("en_inst",StringType(),True), \
    StructField("date",StringType(),True), \
    StructField("amount", DoubleType(), True), \
    StructField("enq_class", StringType(), True), \
    StructField("enq_cust", StringType(), True) \
  ])

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("schema", schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(df)
'''
Reading sample flagged file
'''
file_location1 = "/FileStore/tables/flag-1.csv"
schema1 = StructType([ \
    StructField("enq_cust",StringType(),True), \
    StructField("flag",IntegerType(),True), \
    
  ])

# The applied options are for CSV files. For other file types, these will be ignored.
df1 = spark.read.format(file_type) \
  .option("schema", schema1) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location1)
df1=df1.filter("""enq_cust!='10000'""")
display(df1)

In [0]:
import pandas as pd
from  datetime import datetime
import numpy as np
file=df.toPandas()
file=file[file.enq_cust!='10000']
file['date']=pd.to_datetime(file.date,dayfirst='true')

In [0]:
'''
Filter and feature generator
'''
def feature_mom_gen(file,last_date,filter_mon,grp_cols,date,agg_cols,renam_cols):
    past_date = datetime.strptime(last_date,'%Y-%m-%d') -pd.DateOffset(months=filter_mon)
    tmp=file[np.logical_and(file[date]<=last_date,file[date]>=past_date)]
    t=tmp.groupby(grp_cols,as_index=False).agg(agg_cols)
    t.columns=renam_cols
    return(t)

'''
Column name standizer
'''
def name_standard(frame,id,columns):
    c=frame.columns[~frame.columns.isin(columns)]
    c=c[~c.isin(id)]
    df=pd.DataFrame()
    p=[]
    for k in c:
        frame1 = pd.pivot_table(frame, values =k, index =id,columns=columns,
                              aggfunc = np.sum)
        cols=frame1.columns
        a=[]
        for i in np.arange(len(cols)):
            
            tmp=''
            for j in np.arange(len(cols[i])):
                tmp+=cols[i][j]+'_'
            tmp=k+'_'+tmp
            a.append(tmp)
        
        b=['enq_cust']
        b.extend(a)
        df=frame1.reset_index()
        df.columns=b
        p.append(df)
    df=pd.DataFrame()
    for i in p:
        if len(df)==0:
            df=i
        else:
            df=df.merge(i,how='left',left_on=['enq_cust'],right_on=['enq_cust'])
  
    return df
    

In [0]:
'''
Enq Rollup
'''
def enq_rllup(file,last_date):
    '''
    Enq Inst level rollup
    '''
    feature_base=((feature_mom_gen(file,last_date,20,['en_inst','enq_class','enq_cust'],'date',{'amount':['sum','count']},['en_inst','enq_class','enq_cust','sum_amt_20m','cnt_20m']).merge(
                   feature_mom_gen(file,last_date,9,['en_inst','enq_class','enq_cust'],'date',{'amount':['sum','count']},['en_inst','enq_class','enq_cust','sum_amt_9m','cnt_9m']),how='left')).merge(
        feature_mom_gen(file,last_date,6,['en_inst','enq_class','enq_cust'],'date',{'amount':['sum','count']},['en_inst','enq_class','enq_cust','sum_amt_6m','cnt_6m']),how='left')).merge(
        feature_mom_gen(file,last_date,3,['en_inst','enq_class','enq_cust'],'date',{'amount':['sum','count']},['en_inst','enq_class','enq_cust','sum_amt_3m','cnt_3m']),how='left')

    feature_base=name_standard(feature_base,['enq_cust'],['en_inst','enq_class'])
    feature_base=feature_base.fillna(0)
    
    '''
    Enq Class level rollup
    '''
    feature_base1=((feature_mom_gen(file,last_date,20,['enq_class','enq_cust'],'date',{'amount':['sum','count']},['enq_class','enq_cust','sum_amt_250m','cnt_20m']).merge(
        feature_mom_gen(file,last_date,9,['enq_class','enq_cust'],'date',{'amount':['sum','count']},['enq_class','enq_cust','sum_amt_9m','cnt_9m']),how='left')).merge(
        feature_mom_gen(file,last_date,6,['enq_class','enq_cust'],'date',{'amount':['sum','count']},['enq_class','enq_cust','sum_amt_6m','cnt_6m']),how='left')).merge(
        feature_mom_gen(file,last_date,3,['enq_class','enq_cust'],'date',{'amount':['sum','count']},['enq_class','enq_cust','sum_amt_3m','cnt_3m']),how='left')
    feature_base1=name_standard(feature_base1,['enq_cust'],['enq_class'])
    feature_base1=feature_base1.fillna(0)
    
    
    '''
    Enq cust level rollup
    '''
    feature_base2=((feature_mom_gen(file,last_date,20,['enq_cust'],'date',{'amount':['sum','count']},['enq_cust','sum_amt_250m','cnt_20m']).merge(
        feature_mom_gen(file,last_date,9,['enq_cust'],'date',{'amount':['sum','count']},['enq_cust','sum_amt_9m','cnt_9m']),how='left')).merge(
        feature_mom_gen(file,last_date,6,['enq_cust'],'date',{'amount':['sum','count']},['enq_cust','sum_amt_6m','cnt_6m']),how='left')).merge(
        feature_mom_gen(file,last_date,3,['enq_cust'],'date',{'amount':['sum','count']},['enq_cust','sum_amt_3m','cnt_3m']),how='left')
    feature_base2=feature_base2.fillna(0)


    feature_base_final=(feature_base.merge(feature_base1,how='left',left_on=['enq_cust'],right_on=['enq_cust'])).merge(feature_base2,how='left',left_on=['enq_cust'],right_on=['enq_cust'])
    feature_base_final.fillna(0)
    
    feature_base_final=(feature_base.merge(feature_base1,how='left',left_on=['enq_cust'],right_on=['enq_cust'])).merge(feature_base2,how='left',left_on=['enq_cust'],right_on=['enq_cust'])
    feature_base_final.fillna(0)
    
    
    for i in feature_base_final.columns:
        if i!='enq_cust':
            feature_base_final[i]=feature_base_final[i].astype('float64')

    return(feature_base_final)



In [0]:
feature_base_final=enq_rllup(file,'2021-03-31')
feature_base_final

In [0]:
%sql
--Creating database for persisting features
--drop DATABASE enq_feature_store1;
CREATE DATABASE IF NOT EXISTS enq_feature_store4;

In [0]:
'''
Create storage of features
'''
from databricks import feature_store

fs=feature_store.FeatureStoreClient()
fs.create_feature_table(
    name='enq_feature_store4.feature_base_final',
    keys=['enq_cust'],
    features_df=spark.createDataFrame(feature_base_final),
    partition_columns=['enq_cust'],
    description='ENQ INST level',
    
)

In [0]:
'''
Feature lookup
'''
from databricks.feature_store import feature_table
from databricks.feature_store import FeatureLookup
feature_names=spark.sql('''Select * from enq_feature_store4.feature_base_final limit 10''').toPandas()
feature_names=feature_names.drop(columns=['enq_cust'])
feature_lookups = [
    FeatureLookup(
      table_name = 'enq_feature_store4.feature_base_final',
      feature_names = list(feature_names.columns),
      lookup_key = 'enq_cust',
    )
  ]


In [0]:
from mlflow import pyfunc
mlflow.end_run()
 
# Start an mlflow run, which is needed for the feature store to log the model
mlflow.start_run() 
 
# Create the training set that includes the raw input data merged with corresponding features from both feature tables
training_set = fs.create_training_set(
  df1,
  feature_lookups = feature_lookups,
  label = "flag",
  exclude_columns = 'enq_cust'
)
 
# Load the TrainingSet into a dataframe which can be passed into sklearn for training a model
training_df = training_set.load_df()

In [0]:
display(training_df)

In [0]:
from sklearn.model_selection import train_test_split
from mlflow.tracking import MlflowClient
import lightgbm as lgb
import mlflow.lightgbm
from mlflow.models.signature import infer_signature
 
features_and_label = training_df.columns
 
# Collect data into a Pandas array for training
data = training_df.toPandas()[features_and_label].fillna(0)
 
train, test = train_test_split(data, random_state=123)
X_train = train.drop(["flag"], axis=1)
X_test = test.drop(["flag"], axis=1)
y_train = train.flag
y_test = test.flag
 
mlflow.lightgbm.autolog()
train_lgb_dataset = lgb.Dataset(X_train, label=y_train.values)
test_lgb_dataset = lgb.Dataset(X_test, label=y_test.values)
 
param = {"num_leaves": 32, "objective": "binary", "metric": "logloss"}
num_rounds = 100
 
# Train a lightGBM model
model = lgb.train(
  param, train_lgb_dataset, num_rounds
)

In [0]:
'''
REgister model
'''
fs.log_model(
  model,
  artifact_path="model_packaged",
  flavor=mlflow.lightgbm,
  training_set=training_set,
  registered_model_name="enq_sample_model"
)

In [0]:
'''
Latest version of model
'''
def get_latest_model_version(model_name):
  latest_version = 1
  mlflow_client = MlflowClient()
  for mv in mlflow_client.search_model_versions(f"name='{model_name}'"):
    version_int = int(mv.version)
    if version_int > latest_version:
      latest_version = version_int
  return latest_version


In [0]:
latest_model_version = get_latest_model_version("enq_sample_model")
model_uri = f"models:/enq_sample_model/{latest_model_version}"

In [0]:
model_uri