# Step1.Get real data

In [47]:
def retrieve_data(outputUriPrefix):
    from google.cloud import bigquery
    from google.oauth2 import service_account
    from googleapiclient.discovery import build
    from datetime import date
    import os
    from flask import Flask, request
    #!pip install google-api-python-client'
    
    def create_table(outputUriPrefix):
        """create table in bigquery"""
        global client
        global today
        global table_id

        client = bigquery.Client(project='mmtc-staging')
        dataset_id = 'analytics'

        source_uris = outputUriPrefix + "/all_namespaces/kind_analytics/all_namespaces_kind_analytics.export_metadata"

        # Configure the external data source
        dataset_ref = client.dataset(dataset_id)

        today = date.today().strftime("%m%d%y")
        table_id = "latest_dataset_" + today
        #print(table_id)

        table = bigquery.Table(dataset_ref.table(table_id))
        job_config = bigquery.LoadJobConfig()
        job_config.source_format = "DATASTORE_BACKUP"
        job_config.write_disposition = "WRITE_TRUNCATE"

        load_job = client.load_table_from_uri(source_uris,
                                              dataset_ref.table(table_id),
                                              job_config=job_config)

    
    # run upper code in GCP console and replace the following path with the latest one
    outputUriPrefix = outputUriPrefix
    # create latest table
    create_table(outputUriPrefix)
    
    table = client.get_table("mmtc-staging.analytics."+table_id)  # Make an API request.

    original_schema = table.schema
    new_schema = original_schema[:]  # Creates a copy of the schema.
    #new_schema.append(bigquery.SchemaField("FundingSource", "STRING"))
    new_schema.append(bigquery.SchemaField("AppraisedValue", "FLOAT"))

    table.schema = new_schema
    table = client.update_table(table, ["schema"])  # Make an API request.
    
    # perform a query
    query = """
    SELECT ID, FundingSource, LTV, State, LoanProcessor,LoanOfficer, LoanAmount, 
           IF((ABS(TimeWindow)>1000),0,TimeWindow) AS TimeWindow, 
           AppraisedValue
    FROM  
      (SELECT ID, FundingSource, LTV, State, LoanProcessor,LoanOfficer, LoanAmount, AppraisedValue, 
              (DATETIME_DIFF(parse_datetime('%m/%d/%Y',CloseDate), parse_datetime('%m/%d/%Y',SignedDate), DAY)) AS TimeWindow    
       FROM
            (SELECT  
                LoanOriginationSystem_LoanOriginationSystemLoanIdentifier AS ID,
                IFNULL(FundingSource_FullName,'TBD_FS') AS FundingSource, 
                IFNULL(SubjectLoan_AppraisedARLTVRatioPercent.float, 0) AS LTV,
                IFNULL(SubjectProperty_StateCode,"UNKNOWN") AS State,
                IFNULL(SubjectLoan_BorrowerRequestedLoanAmount,0) AS LoanAmount,
                IFNULL(AppraisedValue,0) AS AppraisedValue,
                IFNULL(REPLACE(LenderRepresentative_LenderRepresentativeSignatureName,' ',''),'TBD_LP') AS LoanProcessor,
                IFNULL(REPLACE(LoanOfficer_FullName,' ',''),'TBD_LO') AS LoanOfficer,
                IF(REGEXP_CONTAINS(SubjectLoan_LoanEstimatedClosingDate, '^[0-9]'), 
                    SubjectLoan_LoanEstimatedClosingDate, "01/01/2000") AS CloseDate,
                IF(REGEXP_CONTAINS(Borrower_BorrowerApplicationSignedDate, '^[0-9]'), 
                    Borrower_BorrowerApplicationSignedDate, "01/01/2000") AS SignedDate
            FROM
              `mmtc-staging.analytics.latest_dataset_{}`) AS Table_1)
    WHERE 
        ID IS NOT NULL
    """.format(today)

    query_job = client.query(query)  # Make an API request.
    df = query_job.to_dataframe()
    df = df.set_index("ID")
    return df

# Step2. Get train data and train model

In [48]:
def read_input(train_data, real_X):
  import pandas as pd
  from sklearn.feature_extraction import DictVectorizer
  """Read input data and split it into train and test."""
  data = pd.read_csv(train_data)
  data.dropna(axis=0, subset=['Closed'], inplace=True)
  data = data.set_index('LoanNo.')
  count1, count0 = data['Closed'].value_counts()
  closed = data[data.Closed==1]
  withdraw = data[data.Closed==0]
  overwd = withdraw.sample(count1, replace=True)
  oversample = pd.concat([closed,overwd], axis=0)

  train_y = oversample.Closed
  train_X = oversample.drop(['Closed'], axis=1)
  test_X = real_X

  vec = DictVectorizer() 
  train_X = train_X.to_dict(orient='records')
  train_X = vec.fit_transform(train_X).toarray()
  test_X = test_X.to_dict(orient='records')
  test_X = vec.transform(test_X).toarray()

  return (train_X, train_y, test_X)

In [49]:
def train_model(train_X,
                train_y,
                n_estimators,
                max_depth,
                learning_rate):
  from xgboost import XGBClassifier
  """Train the model using XGBRegressor."""
  model = XGBClassifier(n_estimators=n_estimators,max_depth=max_depth,
                      learning_rate=learning_rate)

  model = model.fit(train_X,
            train_y)
  return model

# Step3. Make prediction on real data

In [50]:
def real_predict(model, test_X, real_X):
  import pandas as pd
  import numpy as np
  pred_cls = model.predict(test_X)
  pred_prob = model.predict_proba(test_X)
  cls_df = pd.DataFrame(data = pred_cls, 
                      columns = ["pred_class"], 
                      index = real_X.index.copy())
  prob_df = pd.DataFrame(data = np.round(pred_prob, 4), 
                       columns = ["prob_0", "prob_1"], 
                       index = real_X.index.copy())

  prob_df = pd.merge(prob_df, cls_df, how = "left", 
                   left_index = True, right_index = True)
  full_df = pd.merge(real_X, prob_df, how = "left", 
                   left_index = True, right_index = True)
  return full_df

# Step4. Add the prection results to firestore

In [51]:
def add_pred(full_df):
    from google.cloud import firestore
    for ID in (full_df.index):
        prec = float(full_df.prob_1[full_df.index == ID])
    def create_close_perc(ID, prec):
        db = firestore.Client(project='mmtc-staging')
        """Create or overwrite a single document"""
        analytics_ref = db.collection(u'analytics').document(ID)
        analytics_ref.set({
        u'SubjectLoan_PredictedClosingPercent': prec
    }, merge=True)
        return
    create_close_perc(ID, prec)
    print('accomplished!')

# Step5. Save Model as joblib

In [52]:
def save_model(model, model_file):
  import logging
  import joblib
  """Save XGBoost model for serving."""
  joblib.dump(model, model_file)
  logging.info("Model export success: %s", model_file)

# Practice

In [53]:
real_X = retrieve_data("gs://mmtc-staging.appspot.com/2020-07-31T17:25:04_27449")

In [54]:
(train_X, train_y, test_X)=read_input('test_new.csv',real_X)

In [55]:
model=train_model(train_X,
                train_y,
                300,5,
                0.3)

In [56]:
full_df = real_predict(model, test_X, real_X)

In [57]:
full_df.head()

Unnamed: 0_level_0,FundingSource,LTV,State,LoanProcessor,LoanOfficer,LoanAmount,TimeWindow,AppraisedValue,prob_0,prob_1,pred_class
ID,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1
1778808f-33e7-4f66-b02d-f051b0d720f8,TBD_FS,0.0,UNKNOWN,TBD_LP,MeganTrammel,420000,0,0.0,0.0026,0.9974,1
24c0ceef-ac92-4fcc-86ec-20a643f3f4b4,TBD_FS,0.0,UNKNOWN,TBD_LP,AaronPfeffer,300000,0,0.0,0.4659,0.5341,1
29a60b1e-0540-4a43-8365-40d496eace1f,TBD_FS,0.0,AZ,TBD_LP,HerbBourdeaux,0,0,0.0,0.8662,0.1338,0
4df33c06-ded2-42ce-a937-0358aa4db42e,TBD_FS,0.0,UNKNOWN,TBD_LP,TBD_LO,0,0,0.0,0.5401,0.4599,0
54eeb0f8-df00-4390-a52a-242baa0fe59e,TBD_FS,0.0,UNKNOWN,TBD_LP,SusanAubin,0,0,0.0,0.424,0.576,1


In [13]:
add_pred(full_df)

accomplished!


# Step 6: Initiate Kubeflow pipelines SDK

In [68]:
!pip install -q kfp --upgrade --user

# Step 7: Convert Python scripts to docker containers

In [70]:
import json
import kfp
from kfp import components as comp
from kfp import dsl
import os
import subprocess
train_op = comp.func_to_container_op(train_model, base_image='tensorflow/tensorflow:latest-gpu-py3')
predict_op = comp.func_to_container_op(real_predict, base_image='tensorflow/tensorflow:latest-gpu-py3')

# Step 8: Define Kubeflow pipeline

In [71]:
client = kfp.Client(host='31b386be1f929b65-dot-us-central2.pipelines.googleusercontent.com')

In [None]:
# Define the pipeline
@dsl.pipeline(
   name='AnalyticsPipeline_1',
   description='A pipeline that performs closing possibility model training and prediction.'
)

# Define parameters to be fed into pipeline
def analytics_container_pipeline(
    data_path: "gs://mmtc-staging.appspot.com/2020-07-31T17:25:04_27449",
    model_file: "mmtc-staging", 
    image_number: int
):
    
    # Define volume to share data between components.
    vop = dsl.VolumeOp(
    name="create_volume",
    resource_name="data-volume", 
    size="1Gi", 
    modes=dsl.VOLUME_MODE_RWM)
    
    # Create training component.
    analytics_training_container = train_op(data_path, model_file) \
                                    .add_pvolumes({data_path: vop.volume})

    # Create MNIST prediction component.
    analytics_predict_container = predict_op("gs://mmtc-staging.appspot.com/2020-07-31T17:25:04_27449", model_file, image_number) \
                                    .add_pvolumes({data_path: mnist_training_container.pvolume})
    
    # Print the result of the prediction
    mnist_result_container = dsl.ContainerOp(
        name="print_prediction",
        image='library/bash:4.4.23',
        pvolumes={data_path: mnist_predict_container.pvolume},
        arguments=['cat', f'{data_path}/result.txt']
    )