# Create the inference wrapper

In order to use the trained model, you must write a wrapper script that loads the model and feeds it with the data coming from the configured data source. This notebook helps you to write such a wrapper, and understand how it works.

To execute this notebook, we will need:
- training data set which was used in notebook [10-CreateClusteringModel](10-CreateClusteringModel.ipynb)
- the model created in the above notebook

# Load the model

The wrapper script must first load the model into the memory in order to make the prediction as fast as possible.  
The filesystem layout (python scripts in `src` folder, model in the `models` folder) is reproduced when running the pipeline, so you can run the same code while experimenting.

In [None]:
import sys
import joblib
from pathlib import Path
sys.path.insert(0, str(Path('../src').resolve()))

model_path = Path('../models/clustering-model.joblib').resolve()

with open(model_path, 'rb') as rpl:
    pipe = joblib.load(rpl)

# Extract model metadata 

Beside the saved model we also included some metadata in the joblib file. We extract them to the following global variables:
- `pipe`: the trained Scikit-Learn Pipeline itself
- `input_columns`: the name of the pipeline inputs
- `output_name`: the name of the pipeline output
- `window`- and `step_size`: the windows size and the frequency of window creation

In [None]:
import numpy

input_columns = ["ph1", "ph2", "ph3"]
output_name = "prediction"
# set model parameters
window_size = pipe.get_params().get('preprocessing__windowing__window_size')
step_size = pipe.get_params().get('preprocessing__windowing__window_step')


# Input data

AI Inference Server wraps the acquired input values into a dictionary and passes them to `process_input(data: dict)` in this form as a single parameter. Each input variable is represented as a separate element in the dictionary. In the code cell below we create the same format from the well known training data and print the first 10 element of this list.

In [None]:
import glob
import pandas

data_files = glob.glob("../data/raw/*.csv")
input_series = pandas.read_csv(data_files[0])  # read test data from the same csv used to train the model
input_list = input_series[['ph1', 'ph2', 'ph3']].to_dict(orient='records')  # creating a list of dictionaries as the `process_input(..)` method receives them
input_list[:10]  # show the first 10 element of the list

# Feed window of data into the model

Now we can create a method that utilizes the model and produces a prediction from a window of input rows according to `window_size`:

In [None]:
def predict(model: dict, model_input: numpy.array):
    prediction = model.predict(model_input)

    return prediction[0]

# Now we can emulate what happens if the data is collected
predict(pipe, input_list[:300])

# Process incoming data and aggregate a window

First we set up a buffer array named `aggregated_data` to store the incoming data until the `window_size` is reached.

Then we define `process_data(input_dict: dict)`, which we will call for every row of input data. It extracts the expected input variables from the dictionary to be received from AI Inference Server and aggregates input rows into `aggregated_data`. The function returns `None` while input is only accumulated but the window size was not reached.

Once the window size is reached, the function produces a prediction and returns it in a dictionary using `output_name` as key. This dictionary can then be passed directly back to AI Inference Server. When a prediction is produced for a data window, we also need to remove the first `step_size` elements from `aggregated_data` to prepare forming the next window using subsequent input data.

This function implements the signature of an entrypoint. The only reason why it does not qualify as an entrypoint is due to the current technical limitation that the entrypoint must be located in the root folder of the pipeline component.

In [None]:
# create an aggregation array for incoming data
aggregated_data = numpy.empty((0, len(input_columns)), int)

def process_data(input_dict: dict):
    global aggregated_data
    
    values = [[numpy.nan if input_dict[variable] is None else input_dict[variable] for variable in input_columns]]
    aggregated_data = numpy.append(aggregated_data, values, axis=0)

    if len(aggregated_data) >= window_size:
        output = {output_name: predict(pipe, aggregated_data)}
        aggregated_data = aggregated_data[step_size:]
        return output

    return None

Now we can emulate in iteration that the method is called 300 times with the expected payload.

In [None]:
for i in range(300):
    result = process_data(input_list[i])
    if result is not None: print(i, result) 

# Create entrypoint

The entrypoint can in this case simply delegate to the function defined above.

In [None]:
def process_input(data: dict):

    return process_data(data)

# Try it with an example

With each input `process_input(...)` will be triggered. The function will return `None` until `window_size` is reached. When the input completes a window, the function gives back a prediction.

In [None]:
output_list = []
for input_data in input_list[:window_size]:
    output_list.append(process_input(input_data))

[output for output in output_list if output]

# Allow parameter updates

AI Inference Server allows you to change predefined parameters in your pipeline at runtime. This way you can set your variables while the pipeline is running. For example, we can change our `step_size` to produce predictions more frequently.

In [None]:
def update_parameters(params: dict):
    global step_size
    step_size = params.get("step_size", step_size)

Now for the sake of example, we will calculate predictions for 900 datapoints, with the original step size of 300. That means we will have 3 predictions:

In [None]:
output_list = []
for input_data in input_list[:window_size*3]:
    output_list.append(process_input(input_data))

[output for output in output_list if output]

When we change the step size to 100, and calculate predictions for the same 900 datapoints, we will get 7 predictions, because we need 300 datapoints for the first window, and then we produce a new prediction after each 100th datapoint.

In [None]:
(300 / 300 ) + (900 - 300) / 100

In [None]:
update_parameters({'step_size': 100})

output_list = []
for input_data in input_list[:window_size*3]:
    output_list.append(process_input(input_data))

[output for output in output_list if output]

# Return metrics

You can return multiple outputs, and some of those outputs can serve as custom metrics for your running model.

When returning such custom metric outputs, the int or float value must be serialized in a specific way, like in the example below.

The metrics have to be defined separately when creating a pipeline component.

In [None]:
import json
import numpy

aggregated_data = numpy.empty((0, len(input_columns)), int)

def process_data(input_dict: dict):
    global aggregated_data
    
    values = [[numpy.nan if input_dict[variable] is None else input_dict[variable] for variable in input_columns]]
    aggregated_data = numpy.append(aggregated_data, values, axis=0)

    if len(aggregated_data) >= window_size:
        output = {output_name: predict(pipe, aggregated_data)}
        features = pipe["preprocessing"].transform(aggregated_data)[0]
        output["model_input_min"]  = metric_output(features[0])
        output["model_input_max"]  = metric_output(features[1])
        output["model_input_mean"] = metric_output(features[2])
        aggregated_data = aggregated_data[step_size:]
        return output

    return None

def metric_output(v: int or float):
    return json.dumps({"value": v})

In [None]:
for i in range(300):
    result = process_data(input_list[i])
    if result is not None: print(i, result) 

Once the inference wrapper runs the way you desire, you can update the code in [entrypoint.py](../entrypoint.py) and [inference.py](../src/si/inference.py) to match yours, and create an edge package in notebook [30-CreatePipelinePackage](30-CreatePipelinePackage.ipynb).