For the given use case, we can consider the following features for a loan application dataset:

existing_loans: Number of existing loans of the applicant.
existing_loan_amounts: Total amount of existing loans.
checking_account_status: Status of the applicant's checking account.
savings_account_status: Status of the applicant's savings account.
credit_history: The applicant's credit history.
employment: Employment status of the applicant.
annual_income: The applicant's annual income.
loan_purpose: Purpose of the new loan.
loan_duration: Duration of the new loan (in months).
installment_rate: Installment rate as a percentage of the applicant's income.
personal_status: Personal status and sex of the applicant.
other_debtors: Other debtors/guarantors associated with the loan.
residence_since: Number of years the applicant has been living at their current residence.
property: Property owned by the applicant.
age: Age of the applicant.
other_installment_plans: Other installment plans the applicant has.
housing: Housing status of the applicant.
job: Job status of the applicant.
dependents: Number of dependents the applicant has.
telephone: Whether the applicant has a telephone registered under their name.
foreign_worker: Whether the applicant is a foreign worker.

## Step 1: Calculate risk

Train a machine learning model using the above features to predict the risk category (low, medium, high) for each loan application.

## Step 2: Calculate new loan amount

Based on the predicted risk category and the applicant's financial status (e.g., annual income, existing loans, credit history), you can define rules or train another machine learning model to estimate the loan amount that should be approved.

## Generate the Data for the two models

In [93]:
import pandas as pd
from faker import Faker
import numpy as np

# define the categorical mapping
cat_map = {
    'checking_account_status': {0: 'negative', 1: 'no_account', 2: 'good_standing', 3: 'overdrawn'},
    'savings_account_status': {0: 'negative', 1: 'no_account', 2: 'good_standing', 3: 'overdrawn'},
    'employment': {0: 'unemployed', 1: 'full time', 2: 'part time', 3: 'contractor', 4: 'unemployed'},
    'credit_history': {0: 'no_credit', 1: 'all_paid', 2: 'existing_credits_paid_back', 3: 'delay_in_paying_off', 4: 'critical_account'},
    'loan_purpose': {0: 'car', 1: 'education', 2: 'home', 3: 'business', 4: 'debt_consolidation'},
    'personal_status': {0: 'single_male', 1: 'single_female', 2: 'married', 3: 'divorced'},
    'other_debtors': {0: 'none', 1: 'co-applicant', 2: 'guarantor'},
    'other_installment_plans': {0: 'none', 1: 'bank', 2: 'stores'},
    'housing': {0: 'own', 1: 'rent', 2: 'free'},
    'job': {0: 'unskilled_resident', 1: 'skilled_employee', 2: 'management_self-employed', 3: 'unemployed'},
    'risk': {0: 'low', 1: 'medium', 2: 'high'}
}

# create Faker object
fake = Faker()

# generate credit data
def generate_credit_data(n):
    data = {
        'checking_account_status': [],
        'savings_account_status': [],
        'credit_history': [],
        'employment': [],
        'loan_purpose': [],
        'loan_duration': [],
        'installment_rate': [],
        'personal_status': [],
        'other_debtors': [],
        'age': [],
        'other_installment_plans': [],
        'housing': [],
        'job': [],
        'dependents': [],
        'annual_income': [],
        'existing_loan_amounts': [],
        'risk': [],
        'loan_amount': [],
        'credit_amount': []
    }

    for i in range(n):
        checking_account_status = fake.random_int(min=0, max=3)
        savings_account_status = fake.random_int(min=0, max=3)

        duration = int(np.random.normal(loc=24, scale=12))  # Change distribution
        credit_history = fake.random_int(min=0, max=4)
        loan_purpose = fake.random_int(min=0, max=4)
        loan_duration = int(np.random.normal(loc=24, scale=12))
        
        credit_amount = int(np.random.normal(loc=5000, scale=2500))  # Change distribution
        annual_income = int(np.random.normal(loc=75000, scale=50000))
        
        existing_loan_amounts = int(np.random.normal(loc=60000, scale=20000))
        
        employment = fake.random_int(min=0, max=4)
        installment_rate = fake.random_int(min=1, max=35)
        personal_status = fake.random_int(min=0, max=4)
        other_debtors = fake.random_int(min=0, max=2)

        age = int(np.random.normal(loc=40, scale=10))  # Change distribution
        other_installment_plans = fake.random_int(min=0, max=2)
        housing = fake.random_int(min=0, max=2)
        job = fake.random_int(min=0, max=3)
        dependents = fake.random_int(min=1, max=2)

        risk = fake.random_int(min=0, max=2)
        

        if risk == 0:  # Low risk
            loan_amount = np.random.uniform(50000, 100000) * 0.6
        elif risk == 1:  # Medium risk
            loan_amount = np.random.uniform(50000, 100000) * 1.2
        else:  # High risk
            loan_amount = np.random.uniform(50000, 100000) * 3

        # add the data to the dictionary
        for key, value in data.items():
            if key in data:
                data[key].append(eval(key))

    return pd.DataFrame(data)

# create a function to introduce drift into the credit data
def introduce_drift(data):
    # modify features to gradually increase over time
    for i in range(len(data)):
        # gradually increase credit amount
        data.at[i, 'credit_amount'] = int(data.at[i, 'credit_amount'] * (1 + 0.05 * (i/len(data))))
        # gradually increase duration
        data.at[i, 'loan_duration'] = int(data.at[i, 'loan_duration'] * (1 + 0.01 * (i/len(data))))
        # gradually increase age
        data.at[i, 'age'] = int(data.at[i, 'age'] * (1 + 0.02 * (i/len(data))))
        # gradually increase existing loan amounts
        data.at[i, 'existing_loan_amounts'] = int(data.at[i, 'existing_loan_amounts'] * (1 + 0.05 * (i/len(data))))

    return data


# create a function to introduce outliers into the credit data
def introduce_outliers(data):
    # randomly select 10 indices to add outliers to
    indices = np.random.choice(len(data), size=10, replace=False)
    for i in indices:
        # change credit amount and duration to high values
        data.at[i, 'credit_amount'] = 50000
        data.at[i, 'duration'] = 84
        
    return data

# generate baseline credit data
baseline_data = generate_credit_data(2000)
# print(baseline_data.info())
# save baseline data to CSV
baseline_data.to_csv('baseline_credit_data.csv', index=False)

# generate drifted credit data
drifted_data = introduce_drift(generate_credit_data(1000))

# save drifted data to CSV
drifted_data.to_csv('drifted_credit_data.csv', index=False)

# generate credit data with outliers
outlier_data = introduce_outliers(generate_credit_data(1000))

# save outlier data to CSV
outlier_data.to_csv('outlier_credit_data.csv', index=False)



In [94]:
# generate baseline credit data
# baseline_data = generate_credit_data(1)
cat_map

{'checking_account_status': {0: 'negative',
  1: 'no_account',
  2: 'good_standing',
  3: 'overdrawn'},
 'savings_account_status': {0: 'negative',
  1: 'no_account',
  2: 'good_standing',
  3: 'overdrawn'},
 'employment': {0: 'unemployed',
  1: 'full time',
  2: 'part time',
  3: 'contractor',
  4: 'unemployed'},
 'credit_history': {0: 'no_credit',
  1: 'all_paid',
  2: 'existing_credits_paid_back',
  3: 'delay_in_paying_off',
  4: 'critical_account'},
 'loan_purpose': {0: 'car',
  1: 'education',
  2: 'home',
  3: 'business',
  4: 'debt_consolidation'},
 'personal_status': {0: 'single_male',
  1: 'single_female',
  2: 'married',
  3: 'divorced'},
 'other_debtors': {0: 'none', 1: 'co-applicant', 2: 'guarantor'},
 'other_installment_plans': {0: 'none', 1: 'bank', 2: 'stores'},
 'housing': {0: 'own', 1: 'rent', 2: 'free'},
 'job': {0: 'unskilled_resident',
  1: 'skilled_employee',
  2: 'management_self-employed',
  3: 'unemployed'},
 'risk': {0: 'low', 1: 'medium', 2: 'high'}}

In [95]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, mean_squared_error
import xgboost as xgb
from imblearn.over_sampling import SMOTE


# Load your dataset
data = pd.read_csv('baseline_credit_data.csv')

# Preparing the data for the loan risk model
risk_data = data[['checking_account_status', 'savings_account_status', 'credit_history', 'employment', 'loan_purpose', 'installment_rate', 'personal_status', 'other_debtors', 'age', 'job', 'dependents', 'risk','existing_loan_amounts']]
X_risk = risk_data.drop('risk', axis=1)
y_risk = risk_data['risk']

# Balancing the dataset
oversample = SMOTE(random_state=42)
X_risk_balanced, y_risk_balanced = oversample.fit_resample(X_risk, y_risk)
print("Balanced Loan Risk Dataset Shape:", X_risk_balanced.shape)

# Splitting the data into train and test sets
X_risk_train, X_risk_test, y_risk_train, y_risk_test = train_test_split(X_risk_balanced, y_risk_balanced, test_size=0.2, random_state=42)
print(X_risk_train.shape)
# Train the XGBoost classifier for risk
risk_model = xgb.XGBClassifier(
    n_estimators=100,
    max_depth=3,
    learning_rate=0.1,
    objective='multi:softprob',
    random_state=42
)
# xgb.XGBClassifier(use_label_encoder=False, objective='multi:softprob', num_class=3, random_state=42)
risk_model.fit(X_risk_train, y_risk_train)

# Testing the loan risk model
y_risk_pred = risk_model.predict(X_risk_test)
risk_accuracy = accuracy_score(y_risk_test, y_risk_pred)
print("Loan Risk Model Accuracy: {:.2f}%".format(risk_accuracy * 100))




Balanced Loan Risk Dataset Shape: (2097, 12)
(1677, 12)


pandas.Int64Index is deprecated and will be removed from pandas in a future version. Use pandas.Index with the appropriate dtype instead.


Loan Risk Model Accuracy: 31.43%


In [96]:
len(risk_data)

2000

In [97]:
print(len(risk_model.feature_importances_))

12


In [98]:
X_risk_test

Unnamed: 0,checking_account_status,savings_account_status,credit_history,employment,loan_purpose,installment_rate,personal_status,other_debtors,age,job,dependents,existing_loan_amounts
29,0,1,4,0,3,2,3,0,52,3,1,55477
845,3,3,0,2,0,11,4,2,41,1,1,93620
812,0,2,4,0,1,23,4,1,40,0,2,66560
432,3,0,4,3,4,17,3,2,42,0,2,76541
1384,3,0,4,4,2,22,4,1,26,1,2,56057
...,...,...,...,...,...,...,...,...,...,...,...,...
76,0,1,1,0,1,32,2,1,31,3,2,64155
205,0,0,2,1,3,22,3,0,32,1,2,99770
2068,1,0,0,3,2,4,2,0,48,3,1,60337
1286,1,2,1,4,2,5,0,0,28,0,1,84807


In [99]:
# Get feature names
risk_feature_names = risk_model.get_booster().feature_names
X_risk_test.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 420 entries, 29 to 1687
Data columns (total 12 columns):
 #   Column                   Non-Null Count  Dtype
---  ------                   --------------  -----
 0   checking_account_status  420 non-null    int64
 1   savings_account_status   420 non-null    int64
 2   credit_history           420 non-null    int64
 3   employment               420 non-null    int64
 4   loan_purpose             420 non-null    int64
 5   installment_rate         420 non-null    int64
 6   personal_status          420 non-null    int64
 7   other_debtors            420 non-null    int64
 8   age                      420 non-null    int64
 9   job                      420 non-null    int64
 10  dependents               420 non-null    int64
 11  existing_loan_amounts    420 non-null    int64
dtypes: int64(12)
memory usage: 42.7 KB


### Save the Model in the .bst format

In [100]:
risk_model.save_model('./model/risk/model.bst')

In [101]:
!gsutil cp ./model/risk/model.bst gs://josh-seldon/loan-default-calculator/models/risk/model.bst

Copying file://./model/risk/model.bst [Content-Type=application/octet-stream]...
/ [1 files][185.7 KiB/185.7 KiB]                                                
Operation completed over 1 objects/185.7 KiB.                                    


## Create model-setting.json file

In [None]:
%%writefile ./model/risk/model-settings.json
{
    "implementation": "mlserver_xgboost.XGBoostModel",
    "parameters": {
        "uri": "model.bst",
        "version": "v0.1.0"
    }
}

In [None]:
!gsutil cp ./model/risk/model-settings.json gs://josh-seldon/loan-default-calculator/models/risk/model-settings.json

# Deploy the Model to Seldon Deploy

In [102]:
from seldon_deploy_sdk import Configuration, EnvironmentApi, ApiClient, PipelinesApi, PredictApi, SeldonDeploymentsApi, ModelMetadataServiceApi, DriftDetectorApi, BatchJobsApi, BatchJobDefinition, OutlierDetectorApi, ModelsApi, Model, Pipeline
from seldon_deploy_sdk.auth import OIDCAuthenticator
from seldon_deploy_sdk.rest import ApiException
import pprint
import json
import requests
from requests.structures import CaseInsensitiveDict

In [103]:
SD_IP = "34.139.2.61"
config = Configuration()
config.host = f"https://{SD_IP}/seldon-deploy/api/v1alpha1"
config.oidc_server = f"https://{SD_IP}/auth/realms/deploy-realm"
config.oidc_client_id = "sd-api"
config.oidc_client_secret = "sd-api-secret"
# config.username = "admin"
# config.password = "12341234"
config.auth_method = "client_credentials"
config.verify_ssl=False

auth = OIDCAuthenticator(config)
config.id_token = auth.authenticate()

api_client = ApiClient(configuration=config, authenticator=auth)

env_api = EnvironmentApi(api_client)
user = env_api.read_user()

print(user)

{'email': '', 'groups': None, 'id': 'service-account-sd-api', 'name': ''}


# Pushing the Model to Production in Seldon Deploy Advanced

In [104]:
NAMESPACE = "seldon"
RISK_MODEL_NAME = "loan-risk"
# AMOUNT_MODEL_NAME = "loanamttest"
# TRANSFORMER_NAME = "riskcombiner"
PIPELINE_NAME = "loan-risk-pipeline"
RISK_MODEL_URI = "gs://josh-seldon/loan-default-calculator/models/risk"
# AMOUNT_MODEL_URI = "gs://josh-seldon/loan-default-calculator/models/amount"
# TRANSFORMER_URL = "gs://josh-seldon/loan-default-calculator/transformer"

## Step 1: deploy XGBoost Loan Risk Classifier model into Seldon Deploy

In [105]:
risk_model = {
   "kind":"Model",
   "metadata":{
      "name":f"{RISK_MODEL_NAME}",
      "namespace":NAMESPACE,
      "annotations":{
         "seldon.io/project":"default"
      }
   },
   "spec":{
      "storageUri":RISK_MODEL_URI,
      "requirements":[
         "xgboost"
      ],
      "replicas":1,
      "memory":"100Ki"
   }
}

risk_model

{'kind': 'Model',
 'metadata': {'name': 'loan-risk',
  'namespace': 'seldon',
  'annotations': {'seldon.io/project': 'default'}},
 'spec': {'storageUri': 'gs://josh-seldon/loan-default-calculator/models/risk',
  'requirements': ['xgboost'],
  'replicas': 1,
  'memory': '100Ki'}}

In [106]:
# Convert JSON to YAML
import yaml

risk_model_yaml = yaml.dump(json.loads(json.dumps(risk_model)))

print(risk_model_yaml)

# Save YAML to file
with open('load-default-calculator.yaml', 'w') as f:
    f.write(risk_model_yaml)

kind: Model
metadata:
  annotations:
    seldon.io/project: default
  name: loan-risk
  namespace: seldon
spec:
  memory: 100Ki
  replicas: 1
  requirements:
  - xgboost
  storageUri: gs://josh-seldon/loan-default-calculator/models/risk



In [107]:
model_client = ModelsApi(api_client)
try:
    api_response = model_client.create_model(NAMESPACE, risk_model, async_req=False)
    print(api_response)
except ApiException as e:
    print("Exception when calling ModelsApi->create_model: %s\n" % e)

{'api_version': None,
 'kind': None,
 'metadata': {'annotations': {'seldon.io/project': 'default'},
              'cluster_name': None,
              'creation_timestamp': '2023-04-07T14:48:30Z',
              'deletion_grace_period_seconds': None,
              'deletion_timestamp': None,
              'finalizers': None,
              'generate_name': None,
              'generation': 1,
              'labels': None,
              'managed_fields': [{'api_version': 'mlops.seldon.io/v1alpha1',
                                  'fields_type': 'FieldsV1',
                                  'fields_v1': {'f:metadata': {'f:annotations': {'.': {},
                                                                                 'f:seldon.io/project': {}}},
                                                'f:spec': {'.': {},
                                                           'f:memory': {},
                                                           'f:replicas': {},
                   

In [108]:
!seldon model list

model			state		reason
-----			-----		------
income-classifier-model	ModelAvailable	
loan-risk-drift		ModelFailed	rpc error: code = InvalidArgument desc = mlserver.errors.MLServerError: Invalid configuration for model loan-risk-drift_1: 1 validation error for TabularDriftConfig
meta -> config_spec
  field required (type=value_error.missing)
incdrift	ModelAvailable	


In [None]:
try:
    api_response = model_client.delete_model(RISK_MODEL_NAME, NAMESPACE)
    print(api_response)
except ApiException as e:
    print("Exception when calling ModelsApi->delete_model: %s\n" % e)

## Step 2: Creating the Pipeline

In [None]:
# PIPELINE_NAME = "incomepipeline2"
pipeline_deployment = {
   "kind":"Pipeline",
   "metadata":{
      "name":PIPELINE_NAME,
      "namespace":NAMESPACE,
   },
   "spec":{
      "steps":[
         {
            "name":RISK_MODEL_NAME,
            "inputs" : [
               f"{PIPELINE_NAME}.inputs.{RISK_MODEL_NAME}"
            ]
         }
         # {
         #    "name":TRANSFORMER_NAME,
         #    "inputs" : [
         #       f"{RISK_MODEL_NAME}.outputs.predict",
         #       f"{PIPELINE_NAME}.inputs.{TRANSFORMER_NAME}"
         #    ]
         # }
         # },
         # {
         #    "name":AMOUNT_MODEL_NAME,
         #    "inputs" : [
         #       f"{TRANSFORMER_NAME}"
         #    ]
         # }
      ],
      "output":{
         "steps":[
            RISK_MODEL_NAME
         ]
      }
   }
}
pipeline_deployment

In [None]:
# Convert JSON to YAML
pipeline_yaml = yaml.dump(json.loads(json.dumps(pipeline_deployment)))

print(pipeline_yaml)

# Save YAML to file
with open('load-default-calculator-pipeline.yaml', 'w') as f:
    f.write(pipeline_yaml)

In [None]:
pipeline_api = PipelinesApi(api_client)
try:
    api_response = pipeline_api.create_pipeline(NAMESPACE, pipeline_deployment)
    print(api_response)
    print(f"Resource Version Number: ")
except ApiException as e:
    print("Exception when calling PipelinesApi->create_pipeline: %s\n" % e)

In [None]:
!seldon pipeline list

In [None]:
try:
    api_response = pipeline_api.delete_pipeline(PIPELINE_NAME, NAMESPACE)
    print(api_response)
except ApiException as e:
    print("Exception when calling ModelsApi->delete_model: %s\n" % e)

### Send Test Request To Seldon Deploy Advanced Pipeline

In [92]:
import requests
import json
from mlserver.codecs import PandasCodec, NumpyCodec

# MODEL_NAME = "loancalulator"
SD_IP = "35.243.251.120"
single_prediction = X_risk_test.sample(n=1)
single_prediction_np = single_prediction.values
risk_prediction = NumpyCodec.encode_input(RISK_MODEL_NAME, payload=single_prediction_np)
risk_prediction_dict = risk_prediction.dict()


# single_amount_prediction = X_loan_amount_test.sample(n=1)
# single_amount_prediction_np = single_amount_prediction.values
# amount_prediction = NumpyCodec.encode_input(AMOUNT_MODEL_NAME, payload=single_amount_prediction_np)
# amount_prediction_dict = amount_prediction.dict()

# test_output_risk = {
#           "data": [
#             0
#           ],
#           "name": "predict",
#           "shape": [
#             1,
#             1
#           ],
#           "datatype": "INT64"
#         }

prediction = {
    "inputs": [
        risk_prediction_dict
        
    ]
}

print(json.dumps(prediction, indent=2))

headers = {
    "Seldon-Model" : f"{PIPELINE_NAME}.pipeline",
    "Host" : "seldon.inference.seldon",
    "Content-Type" : "application/json"
}

URL = f"https://{SD_IP}/v2/models/{PIPELINE_NAME}/infer"

r = requests.post(URL, headers=headers, json=prediction, verify=False)
print(r)
print(json.dumps(r.json(),indent=2))


{
  "inputs": [
    {
      "name": "loan-risk",
      "shape": [
        1,
        12
      ],
      "datatype": "INT64",
      "parameters": {
        "content_type": "np"
      },
      "data": [
        0,
        0,
        4,
        0,
        2,
        20,
        2,
        2,
        44,
        1,
        1,
        64907
      ]
    }
  ]
}
<Response [200]>
{
  "model_name": "",
  "outputs": [
    {
      "data": [
        1
      ],
      "name": "predict",
      "shape": [
        1,
        1
      ],
      "datatype": "INT64"
    }
  ]
}


### Send some requests through kafka

In [None]:
KAFKA_URI = '35.243.202.138:9094'

In [None]:
import mlserver.grpc.converters as converters
from mlserver.types import InferenceRequest
import mlserver.grpc.dataplane_pb2 as dataplane
from mlserver.types import InferenceRequest

from typing import Any


def serialize_inference_request(msg: Any) -> bytes:
    inference_request = InferenceRequest.parse_obj(msg)

    inference_request_g = converters.ModelInferRequestConverter.from_types(
        inference_request, model_name=None, model_version=None
    )

    return inference_request_g.SerializeToString()


def deserialize_inference_request(msg: bytes) -> Any:
    inference_request_g = dataplane.ModelInferRequest.FromString(msg)
    inference_request = converters.ModelInferRequestConverter.to_types(inference_request_g)
    return inference_request.dict()   

### Producer

In [None]:
from confluent_kafka import Producer
from confluent_kafka.admin import AdminClient
import mlserver.grpc.converters as converters
from mlserver.types import InferenceRequest
import json
import socket
import uuid
import mlserver.grpc.converters as converters
import mlserver.grpc.dataplane_pb2 as dataplane
import random

conf = {
    'bootstrap.servers': KAFKA_URI,
    'client.id': socket.gethostname(),
    'group.id': "foo7"
}

output_topic = f"seldon.seldon.pipeline.{PIPELINE_NAME}.outputs"
input_topic = f"seldon.seldon.pipeline.{PIPELINE_NAME}.inputs"
headers = {"pipeline": PIPELINE_NAME}


producer = Producer(conf)
for x in range(100):
    try:
        single_prediction = X_risk_test.sample(n=1, replace=False, random_state=42)
        single_prediction_np = single_prediction.values
        prediction = NumpyCodec.encode_input(RISK_MODEL_NAME, payload=single_prediction_np)
        prediction_dict = prediction.dict()


        prediction = {
            "inputs": [
            prediction_dict
            ]
        }

        # print(prediction)

        msg = converters.ModelInferRequestConverter.from_types(
            InferenceRequest.parse_obj(prediction), model_name=None, model_version=None
        ).SerializeToString()
        print(msg)
        producer.produce(input_topic, msg, key=str(
            uuid.uuid4()), headers=headers)
        producer.flush()
        print('Successfully connected to Kafka server')
    except Exception as e:
        print('ERROR: Failed to connect to Kafka server: {}'.format(e))

# View that the input topic has reached kafka


In [None]:
from confluent_kafka import Producer
from confluent_kafka.admin import AdminClient
from confluent_kafka import Consumer, KafkaError, TopicPartition

import json
import socket

conf = {'bootstrap.servers': '35.243.202.138:9094',
        'client.id': socket.gethostname(),
        'group.id': "fook9",
        'auto.offset.reset': 'smallest'
        }


output_topic = "seldon.seldon.pipeline.loancalculatorpipeline.outputs"
input_topic = "seldon.seldon.pipeline.loancalculatorpipeline.inputs"
headers = {"pipeline": "loancalculatorpipeline"}

consumer = Consumer(conf)

# Assign the topic, partition, and offset to consume from
consumer.assign([TopicPartition(output_topic, 0, 10)])

num_messages = 10
messages_consumed = 0

while messages_consumed < num_messages:
    msg = consumer.poll(1.0)
    if msg is None:
        continue
    if msg.error():
        if msg.error().code() == KafkaError._PARTITION_EOF:
            print('End of partition event')
        else:
            print('Error while consuming message: {}'.format(msg.error()))
    else: 
        value = msg.value()
        # create a new instance of your protobuf message
        # my_message = message()
        # deserialize the message from a binary string
        # json_format.ParseFromString(value)
        prediction_output = deserialize_inference_request(value)
        print(prediction_output)
        # print(f'Received message with format type: {type(value)}')
        messages_consumed += 1
consumer.close()


# Anchors Explainer for the Loan Risk Classifier

In [44]:
from alibi.explainers import AnchorTabular

predict_fn = lambda x: risk_model.predict(x)
feature_names = risk_feature_names
explainer = AnchorTabular(predict_fn, feature_names, seed=1)

In [43]:
len(feature_names)

12

In [45]:
if X_risk_train.isnull().any().any():
    print('DataFrame has missing values')
else:
    print('DataFrame has no missing values')

X_risk_train.shape

DataFrame has no missing values


(1636, 12)

In [46]:
X_train_np = X_risk_train.values
X_test_np = X_risk_test.values

In [47]:
explainer.fit(X_train_np, disc_perc=(25, 50, 75))

AnchorTabular(meta={
  'name': 'AnchorTabular',
  'type': ['blackbox'],
  'explanations': ['local'],
  'params': {'seed': 1, 'disc_perc': (25, 50, 75)},
  'version': '0.9.1'}
)

In [48]:
class_names = ['Low', 'Medium', 'High']

In [50]:
idx = 400
print('Prediction: ', class_names[explainer.predictor(X_test_np[idx].reshape(1, -1))[0]])

Prediction:  High


In [51]:
explanation = explainer.explain(X_test_np[idx], threshold=0.95)
print('Anchor: %s' % (' AND '.join(explanation.anchor)))
print('Precision: %.2f' % explanation.precision)
print('Coverage: %.2f' % explanation.coverage)


Could not find an anchor satisfying the 0.95 precision constraint. Now returning the best non-eligible result. The desired precision threshold might not be achieved due to the quantile-based discretisation of the numerical features. The resolution of the bins may be too large to find an anchor of required precision. Consider increasing the number of bins in `disc_perc`, but note that for some numerical distribution (e.g. skewed distribution) it may not help.


Anchor: age <= 33.00 AND 2.00 < personal_status <= 3.00 AND job > 2.25 AND credit_history <= 1.00 AND 46555.25 < existing_loan_amounts <= 59743.00 AND installment_rate <= 18.00 AND employment > 3.00 AND savings_account_status > 2.00 AND other_debtors <= 1.00 AND loan_purpose > 1.00
Precision: 0.88
Coverage: 0.00


In [52]:
explainer.save("./explainer/loan-risk-explainer")

In [53]:
%%writefile ./explainer/loan-risk-explainer/model-settings.json
{
  "implementation": "mlserver_alibi_explain.AlibiExplainRuntime"
}

Overwriting ./explainer/loan-risk-explainer/model-settings.json


In [54]:
!gsutil cp -r ./explainer gs://josh-seldon/loan-default-calculator/loan-risk-explainer

Copying file://./explainer/loan-risk-explainer/model-settings.json [Content-Type=application/json]...
Copying file://./explainer/loan-risk-explainer/explainer.dill [Content-Type=application/octet-stream]...
==> NOTE: You are uploading one or more large file(s), which would run          
significantly faster if you enable parallel composite uploads. This
feature can be enabled by editing the
"parallel_composite_upload_threshold" value in your .boto
configuration file. However, note that if you do this large files will
be uploaded as `composite objects
<https://cloud.google.com/storage/docs/composite-objects>`_,which
means that any user who downloads such objects will need to have a
compiled crcmod installed (see "gsutil help crcmod"). This is because
without a compiled crcmod, computing checksums on composite objects is
so slow that gsutil disables downloads of composite objects.

Copying file://./explainer/loan-risk-explainer/meta.dill [Content-Type=application/octet-stream]...
\ [3 fi

In [55]:
NAMESPACE = "seldon"
# MODEL_NAME = "loancalulator"
# PIPELINE_NAME = "loancalculatorpipeline"

EXPLAIENR_URL = "gs://josh-seldon/loan-default-calculator/loan-risk-explainer/explainer/loan-risk-explainer"
EXPLAINER_NAME = f"{RISK_MODEL_NAME}-explainer"

In [56]:
!seldon model list

model			state		reason
-----			-----		------
incdrift		ModelAvailable	
loan-risk		ModelAvailable	
income-classifier-model	ModelAvailable	


## Deploy Explainer Loan default calculator model into Seldon Deploy

In [57]:
explainer_config = {
   "kind":"Model",
   "metadata":{
      "name":EXPLAINER_NAME,
      "namespace":NAMESPACE,
      "annotations":{
         "seldon.io/project":"default"
      },
      "labels":{
         "seldon.io/explainer": "true",
         "seldon.io/pipeline": PIPELINE_NAME
      }
   },
   "spec":{
      "storageUri":EXPLAIENR_URL,
      "explainer" : {
         "type": "anchor_tabular",
         "modelRef": RISK_MODEL_NAME
      },
      "requirements":[
         "alibi-explain"
      ],
      "replicas":1,
      "memory":"100Ki"
   }
}

explainer_config

{'kind': 'Model',
 'metadata': {'name': 'loan-risk-explainer',
  'namespace': 'seldon',
  'annotations': {'seldon.io/project': 'default'},
  'labels': {'seldon.io/explainer': 'true',
   'seldon.io/pipeline': 'loan-risk-pipeline'}},
 'spec': {'storageUri': 'gs://josh-seldon/loan-default-calculator/loan-risk-explainer/explainer/loan-risk-explainer',
  'explainer': {'type': 'anchor_tabular', 'modelRef': 'loan-risk'},
  'requirements': ['alibi-explain'],
  'replicas': 1,
  'memory': '100Ki'}}

In [58]:
model_client = ModelsApi(api_client)
try:
    api_response = model_client.create_model(NAMESPACE, explainer_config, async_req=False)
    print(api_response)
except ApiException as e:
    print("Exception when calling ModelsApi->create_model: %s\n" % e)

{'api_version': None,
 'kind': None,
 'metadata': {'annotations': {'seldon.io/project': 'default'},
              'cluster_name': None,
              'creation_timestamp': '2023-04-07T13:44:22Z',
              'deletion_grace_period_seconds': None,
              'deletion_timestamp': None,
              'finalizers': None,
              'generate_name': None,
              'generation': 1,
              'labels': {'seldon.io/explainer': 'true',
                         'seldon.io/pipeline': 'loan-risk-pipeline'},
              'managed_fields': [{'api_version': 'mlops.seldon.io/v1alpha1',
                                  'fields_type': 'FieldsV1',
                                  'fields_v1': {'f:metadata': {'f:annotations': {'.': {},
                                                                                 'f:seldon.io/project': {}},
                                                               'f:labels': {'.': {},
                                                           

In [64]:
!seldon model list

model			state		reason
-----			-----		------
loan-risk		ModelAvailable	
income-classifier-model	ModelAvailable	
incdrift		ModelAvailable	


In [63]:
try:
    api_response = model_client.delete_model(EXPLAINER_NAME, NAMESPACE)
    print(api_response)
except ApiException as e:
    print("Exception when calling ModelsApi->delete_model: %s\n" % e)

None


#  Create and Deploy a Drift Detector

## Create a Tabular Drift Detector on the Loan Risk Classifier

In [70]:
drift_cat_map = {
0: None,
 1: None,
 2: None,
 3: None,
 4: None,
 6: None,
 7: None,
 9: None,
 10: None,
 11: None
}

In [71]:
from alibi_detect.cd import TabularDrift

X_train_np = X_risk_train.values
X_test_np = X_risk_test.values

cd = TabularDrift(X_train_np, p_val=0.05, categories_per_feature=drift_cat_map)

## Load the Drift Dataset CSV file

In [75]:
df_drift = pd.read_csv('drifted_credit_data.csv')

# Take a random subset of the data
subset = df_drift.sample(n=25)

risk_data = subset[['checking_account_status', 'savings_account_status', 'credit_history', 'employment', 'loan_purpose', 'installment_rate', 'personal_status', 'other_debtors', 'age', 'job', 'dependents', 'risk','existing_loan_amounts']]

# Convert the subset to a numpy array
X = risk_data.values

risk_data.head()

Unnamed: 0,checking_account_status,savings_account_status,credit_history,employment,loan_purpose,installment_rate,personal_status,other_debtors,age,job,dependents,risk,existing_loan_amounts
797,1,3,4,0,2,13,0,1,48,2,2,0,60034
972,1,2,2,1,1,32,2,2,44,1,1,2,35607
753,3,0,1,1,3,19,2,1,4,1,1,1,55901
891,1,0,4,2,0,35,1,1,31,0,2,2,64451
168,2,1,1,1,3,32,4,0,20,3,2,2,61379


In [76]:
preds = cd.predict(X, drift_type='batch', return_p_val=True, return_distance=True)
preds

{'data': {'is_drift': 0,
  'distance': array([1.87660158e+00, 3.11167598e+00, 5.42213058e+00, 3.47014713e+00,
         4.42329168e-01, 1.10611245e-01, 5.19603825e+00, 1.86070538e+00,
         9.80440080e-02, 4.50963116e+00, 2.89135695e+00, 1.66100000e+03],
        dtype=float32),
  'p_val': array([0.59840906, 0.37472537, 0.24665953, 0.482432  , 0.9788669 ,
         0.8869241 , 0.26776767, 0.3944146 , 0.9507276 , 0.21143283,
         0.08905589, 0.23931743], dtype=float32),
  'threshold': 0.004166666666666667},
 'meta': {'name': 'TabularDrift',
  'online': False,
  'data_type': None,
  'version': '0.11.0',
  'detector_type': 'drift'}}

In [77]:
from alibi_detect.saving import save_detector, load_detector

# we can also save/load an initialised detector
filepath = './drift'  # change to directory where detector is saved
save_detector(cd, filepath)
# cd = load_detector(filepath)

In [78]:
%%writefile ./drift/model-settings.json
{
  "implementation": "mlserver_alibi_detect.AlibiDetectRuntime"
}

Overwriting ./drift/model-settings.json


In [79]:
!gsutil cp -r ./drift gs://josh-seldon/loan-default-calculator/risk/drift/

Copying file://./drift/model-settings.json [Content-Type=application/json]...
Copying file://./drift/x_ref.npy [Content-Type=application/octet-stream]...     
Copying file://./drift/config.toml [Content-Type=application/octet-stream]...   
\ [3 files][154.0 KiB/154.0 KiB]                                                
Operation completed over 3 objects/154.0 KiB.                                    


In [80]:
DRIFT_NAME = f"{RISK_MODEL_NAME}-drift"
DRIFT_URL = "gs://josh-seldon/loan-default-calculator/risk/drift/drift"

In [81]:
drift_config = {
   "kind":"Model",
   "metadata":{
      "name":DRIFT_NAME,
      "namespace":NAMESPACE,
      "annotations":{
         "seldon.io/project":"default"
      },
      "labels":{
         "seldon.io/detector-type": "drift",
         "seldon.io/pipeline": PIPELINE_NAME
      }
   },
   "spec":{
      "storageUri":DRIFT_URL,
      "requirements":[
         "alibi-detect",
         "mlserver"
      ],
      "replicas":1,
      "memory":"100Ki"
   }
}

drift_config

{'kind': 'Model',
 'metadata': {'name': 'loan-risk-drift',
  'namespace': 'seldon',
  'annotations': {'seldon.io/project': 'default'},
  'labels': {'seldon.io/detector-type': 'drift',
   'seldon.io/pipeline': 'loan-risk-pipeline'}},
 'spec': {'storageUri': 'gs://josh-seldon/loan-default-calculator/risk/drift/drift',
  'requirements': ['alibi-detect', 'mlserver'],
  'replicas': 1,
  'memory': '100Ki'}}

In [82]:
# Convert JSON to YAML
import yaml

drift_config_yaml = yaml.dump(json.loads(json.dumps(drift_config)))

print(drift_config_yaml)

# Save YAML to file
with open('load-default-calculator-drift.yaml', 'w') as f:
    f.write(drift_config_yaml)

kind: Model
metadata:
  annotations:
    seldon.io/project: default
  labels:
    seldon.io/detector-type: drift
    seldon.io/pipeline: loan-risk-pipeline
  name: loan-risk-drift
  namespace: seldon
spec:
  memory: 100Ki
  replicas: 1
  requirements:
  - alibi-detect
  - mlserver
  storageUri: gs://josh-seldon/loan-default-calculator/risk/drift/drift



In [83]:
model_client = ModelsApi(api_client)
try:
    api_response = model_client.create_model(NAMESPACE, drift_config, async_req=False)
    print(api_response)
except ApiException as e:
    print("Exception when calling ModelsApi->create_model: %s\n" % e)

{'api_version': None,
 'kind': None,
 'metadata': {'annotations': {'seldon.io/project': 'default'},
              'cluster_name': None,
              'creation_timestamp': '2023-04-07T13:52:46Z',
              'deletion_grace_period_seconds': None,
              'deletion_timestamp': None,
              'finalizers': None,
              'generate_name': None,
              'generation': 1,
              'labels': {'seldon.io/detector-type': 'drift',
                         'seldon.io/pipeline': 'loan-risk-pipeline'},
              'managed_fields': [{'api_version': 'mlops.seldon.io/v1alpha1',
                                  'fields_type': 'FieldsV1',
                                  'fields_v1': {'f:metadata': {'f:annotations': {'.': {},
                                                                                 'f:seldon.io/project': {}},
                                                               'f:labels': {'.': {},
                                                      

In [84]:
!seldon model list

model			state		reason
-----			-----		------
loan-risk		ModelAvailable	
income-classifier-model	ModelAvailable	
incdrift		ModelAvailable	
loan-risk-drift		ModelFailed	rpc error: code = InvalidArgument desc = mlserver.errors.MLServerError: Invalid configuration for model loan-risk-drift_1: 1 validation error for TabularDriftConfig
meta -> config_spec
  field required (type=value_error.missing)


In [None]:
try:
    api_response = model_client.delete_model(DRIFT_NAME, NAMESPACE)
    print(api_response)
except ApiException as e:
    print("Exception when calling ModelsApi->delete_model: %s\n" % e)

In [None]:
# PIPELINE_NAME = "incomepipeline2"
pipeline_w_drift = {
   "kind":"Pipeline",
   "metadata":{
      "name":PIPELINE_NAME,
      "namespace":NAMESPACE,
      "resourceVersion":"16409761"
   },
   "spec":{
      "steps":[
         {
            "name":RISK_MODEL_NAME
         },
         {
            "name": f"{RISK_MODEL_NAME}-drift",
            "batch" : {
                "size" : 150
            }
         }
      ],
      "output":{
         "steps":[
            RISK_MODEL_NAME
         ]
      }
   }
}
pipeline_w_drift

In [None]:
# Convert JSON to YAML
import yaml

pipeline_w_drift_yaml = yaml.dump(json.loads(json.dumps(pipeline_w_drift)))

print(pipeline_w_drift_yaml)

# Save YAML to file
with open('loan-default-calculator-drift-pipeline.yaml', 'w') as f:
    f.write(pipeline_w_drift_yaml)

In [None]:
pipeline_api = PipelinesApi(api_client)
try:
    api_response = pipeline_api.update_pipeline(PIPELINE_NAME, NAMESPACE, pipeline_w_drift)
    print(api_response)
except ApiException as e:
    print("Exception when calling PipelinesApi->update_pipeline: %s\n" % e)


## KernalShap Explainer

In [None]:
import shap
shap.initjs()

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

from alibi.explainers import KernelShap
from alibi.datasets import fetch_adult
from scipy.special import logit
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, confusion_matrix, ConfusionMatrixDisplay
from sklearn.model_selection import cross_val_score, train_test_split
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder

# KS Drift

In [None]:
from alibi_detect.cd import KSDrift

cd = KSDrift(X_train_np, p_val=0.05)

In [None]:
preds = cd.predict(X, drift_type='batch', return_p_val=True, return_distance=True)


In [None]:
preds

## MM Drift Detector

In [None]:
from alibi_detect.cd import MMDDrift
# import tensorflow as tf
# tf.keras.backend.clear_session()
# from tensorflow.keras.layers import Dense, InputLayer



cd_torch = MMDDrift(X_train_np, backend='pytorch', p_val=.05)


# Outlier

## VAE Outlier

### Apply One Hot Encoding

In [None]:
X_train

In [None]:
from sklearn.preprocessing import OneHotEncoder

# Create dataframe of only categorical features
cat_data = df[['checking_account_status', 'credit_history', 'employment']]


# Create instance of OneHotEncoder class
encoder = OneHotEncoder(categories='auto')

# Fit the encoder to the categorical data
encoder.fit(cat_data)

# Transform the categorical features into one-hot encoded features
onehot_cat_data = encoder.transform(cat_data).toarray()

# Combine the one-hot encoded categorical features with the numerical features
num_data = df[['duration', 'credit_amount', 'risk']]
encoded_data = pd.DataFrame(np.concatenate([onehot_cat_data, num_data], axis=1))

# Print the first few rows of the encoded data
print(encoded_data.head())

In [None]:
import pandas as pd
import numpy as np
from alibi_detect.od import OutlierVAE
from sklearn.preprocessing import StandardScaler
from tensorflow.keras.optimizers import Adam

# Load the data
data = pd.read_csv('credit_data.csv')

# Separate the features from the target variable
X = data.drop('risk', axis=1).values

# Standardize the feature matrix
scaler = StandardScaler().fit(X)
X = scaler.transform(X)

# Define the VAE parameters
latent_dim = 8
encoder_layers = [128, 64]
decoder_layers = [64, 128]
batch_size = 64
epochs = 30

# Initialize the VAE model
vae = OutlierVAE(
    threshold=0.05,
    latent_dim=latent_dim,
    encoder_net=encoder_layers,
    decoder_net=decoder_layers,
    score_type='mse'
)


In [None]:
seldon.seldon-gitops.model.wed.outputs

## Consume Drift 

In [None]:
from confluent_kafka import Producer
from confluent_kafka.admin import AdminClient
from confluent_kafka import Consumer, KafkaError, TopicPartition

import json
import socket

conf = {'bootstrap.servers': '35.243.202.138:9094',
        'client.id': socket.gethostname(),
        'group.id': "fook9",
        'auto.offset.reset': 'smallest'
        }


output_topic = "seldon.seldon.model.loan-drift.outputs"
headers = {"pipeline": "loancalculatorpipeline"}

consumer = Consumer(conf)

# Assign the topic, partition, and offset to consume from
consumer.assign([TopicPartition(output_topic, 0, 10)])

num_messages = 10
messages_consumed = 0

while messages_consumed < num_messages:
    msg = consumer.poll(1.0)
    if msg is None:
        continue
    if msg.error():
        if msg.error().code() == KafkaError._PARTITION_EOF:
            print('End of partition event')
        else:
            print('Error while consuming message: {}'.format(msg.error()))
    else: 
        value = msg.value()
        # create a new instance of your protobuf message
        # my_message = message()
        # deserialize the message from a binary string
        # json_format.ParseFromString(value)
        prediction_output = deserialize_inference_request(value)
        print(json.dumps(prediction_output,indent=2))
        # print(f'Received message with format type: {type(value)}')
        messages_consumed += 1
consumer.close()


## Add a lot of Features

In [None]:
from confluent_kafka import Producer
from confluent_kafka.admin import AdminClient
import mlserver.grpc.converters as converters
from mlserver.types import InferenceRequest
from mlserver.codecs import NumpyCodec
import json
import socket
import uuid
import mlserver.grpc.converters as converters
import mlserver.grpc.dataplane_pb2 as dataplane
import random





producer = Producer(conf)
for x in range(10):
    try:
        single_prediction = X_test.sample(n=1, replace=False, random_state=42)
        single_prediction_np = single_prediction.values
        prediction = NumpyCodec.encode_input("unecessary", payload=single_prediction_np)
        prediction_dict = prediction.dict()


        prediction = {
            "inputs": [
                prediction_dict
            ]
        }

        print(prediction)
        msg = converters.ModelInferRequestConverter.from_types(
            InferenceRequest.parse_obj(prediction), model_name=None, model_version=None
        ).SerializeToString()
        producer.produce(input_topic, msg, key=str(
            uuid.uuid4()), headers=headers)
        producer.flush()
        print('Successfully connected to Kafka server')
    except Exception as e:
        print('ERROR: Failed to connect to Kafka server: {}'.format(e))

# View that the input topic has reached kafka


# Expose the Kafka for funsies

In [None]:
%%writefile kafka.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: seldon
  namespace: kafka
spec:
  entityOperator:
    userOperator: {}
  kafka:
    replicas: 3
    version: 3.3.1
    config:
      auto.create.topics.enable: true
      default.replication.factor: 1
      inter.broker.protocol.version: 3.3
      min.insync.replicas: 1
      offsets.topic.replication.factor: 1
      transaction.state.log.min.isr: 1
      transaction.state.log.replication.factor: 1
    listeners:
    - name: plain
      port: 9092
      tls: false
      type: internal
    - name: external
      port: 9094
      type: loadbalancer
      tls: false
    storage:
      type: ephemeral
  zookeeper:
    replicas: 1
    storage:
      type: ephemeral

In [None]:
!kubectl apply -f kafka.yaml