# Insurance claims demo with SCv2

This notebook implements a simple insurance claims processing pipeline, while showcase a number of SCv2 features.

First off, make sure you have Seldon Core v2 up and running. For instructions please refer to the README.me found in this repo.

We will be using Seldon CLI a lot, let's check it.

In [1]:
!which seldon

/home/andrei/projects/seldon-core/operator/bin/seldon


## Stage 1 - basic pipeline

In the first part of the demo we will implement the basic pipeline as displayed here

![insurance claims pipeline](https://raw.githubusercontent.com/mlatcl/fbp-vs-soa/main/insurance_claims/diagrams/insurance_claims_fbp_min.png?raw=true)

As you can see, the pipeline takes in a claim, classifies it (in several steps) as simple and complex, and generates corresponding payout to the claimant. All circles here represent business logic nodes, while rectangles are data streams that hold input, output and interim data. Because of its dataflow stream-base nature, SCv2 is an ideal tool to build such data processing pipelines.

We have implemented each step of the pipeline as a custom Python code wrapped as a Triton model. Their source code and specs can be inspected in the [models](./models) folder in this repo. For example, here are their yaml definitions.

In [2]:
!cat ./models/insurance_claims/calculate_claim_value.yaml
print("----------------------------------------------------------------------")
!cat ./models/insurance_claims/classify_claim_value.yaml
print("----------------------------------------------------------------------")
!cat ./models/insurance_claims/classify_claim_complexity.yaml
print("----------------------------------------------------------------------")
!cat ./models/insurance_claims/calculate_complex_claim_payout.yaml
print("----------------------------------------------------------------------")
!cat ./models/insurance_claims/calculate_simple_claim_payout.yaml


apiVersion: mlops.seldon.io/v1alpha1
kind: Model
metadata:
  name: calculate_claim_value
  namespace: seldon-mesh
spec:
  storageUri: "/mnt/models/insurance_claims/calculate_claim_value"
  requirements:
  - triton
  - python
----------------------------------------------------------------------
apiVersion: mlops.seldon.io/v1alpha1
kind: Model
metadata:
  name: classify_claim_value
  namespace: seldon-mesh
spec:
  storageUri: "/mnt/models/insurance_claims/classify_claim_value"
  requirements:
  - triton
  - python
----------------------------------------------------------------------
apiVersion: mlops.seldon.io/v1alpha1
kind: Model
metadata:
  name: classify_claim_complexity
  namespace: seldon-mesh
spec:
  storageUri: "/mnt/models/insurance_claims/classify_claim_complexity"
  requirements:
  - triton
  - python
----------------------------------------------------------------------
apiVersion: mlops.seldon.io/v1alpha1
kind: Model
metadata:
  name: calculate_complex_claim_payout
  namesp

Notice how in the yaml file we specify path that spells `...models/insurance_claims/...`. This path agrees with the local path SCv2 was given when it was launched.

Now we can deploy all the models with Seldon CLI.

In [3]:
!seldon model load -f ./models/insurance_claims/calculate_claim_value.yaml
!seldon model load -f ./models/insurance_claims/classify_claim_value.yaml
!seldon model load -f ./models/insurance_claims/classify_claim_complexity.yaml
!seldon model load -f ./models/insurance_claims/calculate_complex_claim_payout.yaml
!seldon model load -f ./models/insurance_claims/calculate_simple_claim_payout.yaml

{}
{}
{}
{}
{}


Similarly, the SCv2 pipeline can be found in the [pipelines](./pipelines) folder. We can output and inspect it here, as it has a number of interesting features.

In [4]:
!cat ./pipelines/insurance_claims.yaml

apiVersion: mlops.seldon.io/v1alpha1
kind: Pipeline
metadata:
  name: insurance_claims
  namespace: seldon-mesh
spec:
  steps:
    - name: calculate_claim_value
      inputs:
        - insurance_claims.inputs.total_claim_amount
    - name: classify_claim_value
      inputs:
        - calculate_claim_value.outputs.claim_value
    - name: classify_claim_complexity
      inputs:
        - insurance_claims.inputs.total_claim_amount
        - insurance_claims.inputs.auto_year
        - insurance_claims.inputs.witnesses
        - insurance_claims.inputs.police_report_available
      triggers:
      - classify_claim_value.outputs.is_low_value_claim
    - name: calculate_simple_claim_payout
      inputs:
      - insurance_claims.inputs.total_claim_amount
      - insurance_claims.inputs.claim_id
      triggers:
      - classify_claim_complexity.outputs.is_simple_claim
    - name: calculate_complex_claim_payout
      inputs:
      - insurance_claims.inputs.total_claim_amount
      - insurance_cl

First, note how SCv2 allows us to use pipeline input as an input to any of the pipeline steps. SCv2 allows great flexibility, as any datasteam can be re-use as input of any step.

Second, the pipeline uses a lot of triggers. Triggers are special inputs that are not passed to the node themselves, but their presence triggers computation. Triggers can be joined, for example in this pipeline we use `any` join, meanining the computation will be triggered if any of the trigger inputs is present.

Third, the pipeline merges outputs together. We have a stream of simple and a stream of complex payouts, and the we merge them into a single pipeline output stream.

Now the pipeline can be loaded.

In [5]:
!seldon pipeline load -f ./pipelines/insurance_claims.yaml

{}


We can ensure that models and pipelines are available for us. Here we check their status, which is returned as JSON, and search for specific fields that show their status.

In [6]:
!seldon model status calculate_claim_value | jq -M . | grep state

          "state": "Available",
      "state": {
        "state": "ModelAvailable",


In [7]:
!seldon pipeline status insurance_claims | jq -M . | grep status

        "status": "PipelineReady",


Once the pipeline and the models are ready for use, we can issue inference requests to the pipeline. Our input is a CSV file with insurance claims, so we will load it, one row at a time, and form a request JSON string. We can then use CLI to send the request.

Below is the helper function to build such request strings. There are actually two utility functions there, as we will be using another one later on in this demo.

In [8]:
import json
import pandas as pd
import base64

df = pd.read_csv("insurance_claims.csv")

df = df.drop(columns=['incident_date', 'policy_bind_date'])

pandas_request_types_map = {
    "int64": "INT64",
    "float64": "FP64",
    "object": "BYTES"
}

def get_triton_request_string(claim_index):
    records = []
    for column_name, column_type in zip(df.columns, df.dtypes):
        if not str(column_type) in pandas_request_types_map:
            continue

        request_type = pandas_request_types_map[str(column_type)]
        data = df.iloc[claim_index:claim_index+1, :][column_name].tolist()

        content = {
            "name": column_name,
            "contents": {request_type.lower() + "_contents": data},
            "datatype": request_type,
            "shape": [1]
        }

        if column_type == "object":
            content["contents"]["bytes_contents"] = [base64.b64encode(x.encode()).decode() for x in content["contents"]["bytes_contents"]]

        records.append(content)

    claim_id = {"name": "claim_id", "contents": {"int64_contents": [claim_index]}, "datatype": "INT64", "shape": [1]}
    records.append(claim_id)
    request = {
        "model_name": "does-this-matter?",
        "inputs": records
    }

    request_string = json.dumps(request)
    return request_string

def get_mlserver_request_string(claim_index):
    records = []
    for column_name, column_type in zip(df.columns, df.dtypes):
        if not str(column_type) in pandas_request_types_map:
            continue

        request_type = pandas_request_types_map[str(column_type)]
        data = df.iloc[claim_index:claim_index+1, :][column_name].tolist()

        content = {
            "name": column_name,
            "datatype": request_type,
            "shape": [1],
            "contents": {request_type.lower() + "_contents": data},
        }

        if column_type == "object":
            content["contents"]["bytes_contents"] = [base64.b64encode(x.encode()).decode() for x in content["contents"]["bytes_contents"]]

        records.append(content)

    claim_id = {"name": "claim_id", "contents": {"int64_contents": [claim_index]}, "datatype": "INT64", "shape": [1]}
    records.append(claim_id)
    request = {
        "parameters": {"content_type": {"string_param": "pd"}},
        "model_name": "insurance_claims_classifier_1",
        "inputs": records
    }

    request_string = json.dumps(request)
    return request_string


We can now send a couple of requests. Response has a handful of information, but for brevity we will only show the numeric output, which is the calculated payout for a given claim.

In [9]:
for i in range(10):
    request_string = get_triton_request_string(i)
    !seldon pipeline infer insurance_claims --inference-mode grpc '{request_string}' | jq . | grep -A 1 "fp64Contents" | grep -v "fp64Contents"

          42966
          3042
          27720
          38040
          5200
          38460
          47190
          41272
          22160
          25380


## Stage 2 - data collection

In the second part of the demo we will emulate a data collection scenario. Suppose we decided to replace the classification part of the pipeline with a simple ML classifier. How can we ago about doing this with SCv2?

To train a model, we need to collect a dataset of insurance claims and their complexity. Since SCv2 stores all intermediate computations as data streams, doing this becomes as simple as reading the data off few Kafka streams.

We will be reading data from three points in the inference graph:
1. Pipeline input stream - that will be the dataset inputs
2. Simple claims input stream - labels for simple claims
3. Complex claims input stream - labels for complex claims


For the purposes of this demo we will use Seldon CLI `inspect` to read the data from the streams, and we start by defining a few utility functions that prcess its outputs. Depending on the use case, you may want to use different tools, such as Kafka clients, to do the same.

In [10]:
def parse_complexity_inspect(lines):
    # parses output of
    # !seldon pipeline inspect --offset 100 insurance_claims.calculate_complex_claim_payout.inputs

    # we only need to collect keys for each record, as mere presence of a record in a stream indicates its complexity
    requests = [line.split("\t", 2)[1] for line in lines]
    return requests


def parse_pipeline_inputs_inspect(lines):
    # parses output of
    # !seldon pipeline inspect --offset 100 insurance_claims.inputs
    records = []
    for line in lines:
        tokens = line.split("\t", 2)
        request_id = tokens[1]
        request_body = json.loads(tokens[2])

        input_record = {}
        input_record["request_id"] = request_id
        for fields in request_body["inputs"]:
            value = next(iter(fields["contents"].values()))[0]
            if fields["datatype"] == "INT64":
                value = int(value)
            elif fields["datatype"] == "FP64":
                value = float(value)
            elif fields["datatype"] == "BYTES":
                value = base64.b64decode(value).decode('utf-8')
            input_record[fields["name"]] = value
        
        records.append(input_record)

    return records


We can now read the data and build a dataset.

In [11]:
lines = !seldon pipeline inspect --offset 100 insurance_claims.calculate_complex_claim_payout.inputs
complex_requests = parse_complexity_inspect(lines)


lines = !seldon pipeline inspect --offset 100 insurance_claims.calculate_simple_claim_payout.inputs
simple_requests = parse_complexity_inspect(lines)

In [12]:
lines = !seldon pipeline inspect --offset 100 insurance_claims.inputs
pipeline_records = parse_pipeline_inputs_inspect(lines)

In [13]:
for record in pipeline_records:
    request_id = record["request_id"]
    if request_id in complex_requests:
        record["is_complex"] = True
    elif request_id in simple_requests:
        record["is_complex"] = False
    else:
        raise ValueError(f"Request with ID {request_id} seems lost!")

In [14]:
claim_complexity_df = pd.DataFrame(pipeline_records)
claim_complexity_df = claim_complexity_df.drop(["request_id"], axis='columns')

Next step is to build a classification model. We will be using simple Decision Tree classifier from Sciki-learn, and deploy it to MLServer in SCv2. Importantly, to ensure smooth deployment we need to use the same version of Scikit-learn as the one running on the MLServer, which at the moment of writing is 1.1.2.

In [15]:
# !pip install scikit-learn==1.2.0

!pip list | grep scikit-learn

scikit-learn             1.2.0   


Notice that some of our features are strings, while others are numeric. String features have to be encoded, e.g. with one-hot encoding. This can be a separate transformation step in the SCv2 pipeline, but here we will define a pipeline with Scikit-learn instead.

In [16]:
from sklearn.tree import DecisionTreeClassifier
from sklearn.preprocessing import OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline

X = claim_complexity_df.drop("is_complex", axis=1)
y = claim_complexity_df["is_complex"]

# some columns in the dataset are objects and need to be encoded
categorical_features = [column_name for column_name in X.columns if X[column_name].dtype == object]
categorical_transformer = OneHotEncoder(handle_unknown="ignore")

# other columns are numerical and thus can be just passed through
numerical_features = [column_name for column_name in X.columns if X[column_name].dtype != object]


preprocessor = ColumnTransformer(
    transformers=[
        ("cat", categorical_transformer, categorical_features),
        ("num", "passthrough", numerical_features),
    ]
)

# max depth here cripples the tree algorithm a bit
# so that it isn't perfect
classifier = DecisionTreeClassifier(max_depth=2)

complete_model = Pipeline(
    steps=[("preprocessor", preprocessor), ("classifier", classifier)]
)

complete_model.fit(X, y);

A simple test run to see what our model outputs.

In [17]:
complete_model.predict(X)

array([1., 1., 0., 1., 0., 1., 1., 0., 0., 1.])

## Stage 3 - pipeline with a model

At that stage we have a model that was trained on the data from the pipeline. We are now ready to deploy a new, updated pipeline which makes use of that model.

First of all, we need to serialize the model. Here we are using joblib for that. Notice that for deployment purposes we expand permissions on the joblib file.

In [18]:
import joblib
import os

base_dir = os.path.abspath('./models/insurance_claims')
model_dir = os.path.join(base_dir, "insurance_claims_classifier")
model_file = os.path.join(model_dir, "insurance_claims_classifier.joblib")

joblib.dump(complete_model, model_file)

['/home/andrei/projects/insurance-claims-scv2-demo/models/insurance_claims/insurance_claims_classifier/insurance_claims_classifier.joblib']

In [19]:
!chmod 666 '{model_file}'

MLServer deployments require a model settings file that describes the model. It was already prepared, let's have a look.

In [20]:
model_settings_file = os.path.join(model_dir, "model-settings.json")
!cat '{model_settings_file}'

{
    "name": "insurance_claims_classifier",
    "implementation": "mlserver_sklearn.SKLearnModel",
    "parameters":
    {
        "uri": "./insurance_claims_classifier.joblib",
        "version": "v0.0.1"
    }
}

As before, we also need a yaml file for the model

In [21]:
model_seldon_file = os.path.join(base_dir, "insurance_claims_classifier.yaml")
!cat '{model_seldon_file}'


apiVersion: mlops.seldon.io/v1alpha1
kind: Model
metadata:
  name: insurance_claims_classifier
  namespace: seldon-mesh
spec:
  storageUri: "/mnt/models/insurance_claims/insurance_claims_classifier"
  requirements:
  - sklearn


At the moment SCv2 triggers only work on presence or absence of a record. Whereas our model outputs boolean values. So we also need a simple fork, that takes model ouputs and routes it to one stream or another, based on value. We call this model `is_complex_conditional`, and it can be inspected in the models folder. Here we upload it to SCv2 along with the classifier.

In [29]:
!seldon model load -f '{model_seldon_file}'
!seldon model load -f ./models/insurance_claims/is_complex_conditional.yaml

{}
{}


In [30]:
!seldon model status insurance_claims_classifier | jq -M . | grep state

          "state": "Available",
      "state": {
        "state": "ModelAvailable",


Now we need a new pipeline. You can see that we are reusing some of the steps from the previous pipeline.

In [31]:
!cat ./pipelines/insurance_claims_with_model.yaml

apiVersion: mlops.seldon.io/v1alpha1
kind: Pipeline
metadata:
  name: insurance_claims_with_model
  namespace: seldon-mesh
spec:
  steps:
    - name: insurance_claims_classifier
      inputs:
      - insurance_claims_with_model.inputs
    - name: is_complex_conditional
      inputs:
      - insurance_claims_classifier.outputs.predict
      tensorMap:
        insurance_claims_classifier.outputs.predict: is_complex
    - name: calculate_simple_claim_payout
      inputs:
      - insurance_claims_with_model.inputs.total_claim_amount
      - insurance_claims_with_model.inputs.claim_id
      triggers:
      - is_complex_conditional.outputs.is_simple_claim
    - name: calculate_complex_claim_payout
      inputs:
      - insurance_claims_with_model.inputs.total_claim_amount
      - insurance_claims_with_model.inputs.claim_id
      triggers:
      - is_complex_conditional.outputs.is_complex_claim
  output:
    steps:
    - calculate_simple_claim_payout
    - calculate_complex_claim_payout
    s

In [32]:
!seldon pipeline load -f ./pipelines/insurance_claims_with_model.yaml

{}


In [33]:
!seldon pipeline status insurance_claims_with_model | jq -M . | grep status

        "status": "PipelineReady",


With the new model and new pipeline ready, we can do the requests again.

In [34]:
for i in range(10):
    request_string = get_mlserver_request_string(i)
    !seldon pipeline infer insurance_claims_with_model --inference-mode grpc '{request_string}' | jq . | grep -A 1 "fp64Contents" | grep -v "fp64Contents"

          57288
          4056
          27720
          50720
          5200
          51280
          62920
          41272
          22160
          33840
