# Part 4: Deploy a ML use case with inputs and outputs 

In Part 3, we have built and run our second ML pipeline to retrieve our trained model from the data store, give some new data to it as our pipeline input and retrieve the result as an ouput of our pipeline execution.

What if we want to let an external user execute our pipeline ? Or if we want to execute our pipeline on a certain schedule ?

⇒ We need to deploy our pipeline via an **endpoint** and a **periodic deployment**.

## Import libraries

In [None]:
from craft_ai_sdk import CraftAiSdk, Input, Output, InputSource, OutputDestination
import dotenv
import os
import requests

dotenv.load_dotenv()

In [None]:
CRAFT_AI_SDK_TOKEN = os.environ.get("CRAFT_AI_SDK_TOKEN")
CRAFT_AI_ENVIRONMENT_URL = os.environ.get("CRAFT_AI_ENVIRONMENT_URL")

## SDK instantiation

In [None]:
sdk = CraftAiSdk(sdk_token=CRAFT_AI_SDK_TOKEN, environment_url=CRAFT_AI_ENVIRONMENT_URL)

## Pipeline creation

### Declare inputs and outputs

For the input the name corresponds to the name of an argument of your pipeline's function. In our case name="input_data" and "input_model" (as in the first line of function).

The input about the model is not a string anymore, it's now defined as a file (that we can to retrieve from the data store). 

In [None]:
prediction_input = Input(
    name="input_data", 
    data_type="json"
)

model_input = Input(
    name="input_model", 
    data_type="file"
)

We can notice the same thing for the output, it is now a file (that we want to store on the data store)

In [None]:

prediction_output = Output(
    name="predictions",
    data_type="json"
)

### Create the pipeline

We create the pipeline as in the previous parts

In [None]:
sdk.create_pipeline(
    pipeline_name="part-4-iris-deployment",
    function_path="src/part-4-iris-predict.py",
    function_name="predictIris", 
	description="This function retrieves the trained model and classifies the input data by returning the prediction.",
	container_config={
        "local_folder": "../../get_started",
        "requirements_path": "requirements.txt",
        },
    inputs=[prediction_input, model_input],
    outputs=[prediction_output],
)

In [None]:
sdk.get_pipeline(pipeline_name="part-4-iris-deployment")

## Create the deployments

### Create the endpoint with input and output mappings

An **Endpoint** is a publicly accessible URL that launches the execution of the Pipeline.

Without the platform, you would need to write an api with a library like Flask, Fast API or Django and deploy it on a server that you would have to maintain.



#### IO Mappings

Here, we want to be able to trigger the execution by the endpoint and not only by a run anymore. 

Now, we want the user to be able to use it:
- to send the input data directly to the pipeline via the endpoint
- to retrieve the results on the data store
 
We want also to specify the path to the stored model on the data store, so that the endpoint will take this model directly in the data store. The user won't be the one selecting the model used, it's only on the technical side.



In [None]:
inputs_mapping_endpoint = [
    InputSource(
        pipeline_input_name="input_model",
        datastore_path="get_started/models/iris_knn_model.joblib"
        ),
    InputSource(
        pipeline_input_name="input_data", 
        endpoint_input_name="input_data"
        )
]

output_mapping_endpoint = [
    OutputDestination(
        pipeline_output_name="predictions",
        endpoint_output_name="iris_type")
]

#### Endpoint

With the platform you can create an Endpoint a simple call to the sdk.create_deployment() function of the SDK, by choosing an endpoint_name. This name is used to reference the created endpoint and is further used in its URL. 

For each deployment you can chose either to use the `elastic` mode or the `low-latency` mode. 

- The `low-latency` mode is used for real-time executions, a part of the environnement is already reserved for this deployement's executions. 
- The `elastic` mode is used when there is no need for real-time executions, the next executions will be executed on any avaialble part on the environnement.

By default, the `elastic` mode is used.

In [None]:
endpoint = sdk.create_deployment(
    execution_rule="endpoint",
    pipeline_name="part-4-iris-deployment",
    deployment_name="part-4-iris-endpoint",
    mode="low_latency",
    inputs_mapping=inputs_mapping_endpoint,
    outputs_mapping=output_mapping_endpoint
)

### Target the endpoint

We prepare some input data as in the previous part:

In [None]:
import numpy as np
from sklearn import datasets

np.random.seed(0)
indices = np.random.permutation(150)
iris_X, iris_y = datasets.load_iris(return_X_y=True, as_frame=True)
iris_X_test = iris_X.loc[indices[90:120],:]

new_data = iris_X_test.to_dict(orient="index")

In [None]:
new_data

Once your endpoint is created, you can execute it with a direct HTTP call with the endpoint token. Calling/triggering an endpoint allows us to execute the associated pipeline. Using Python, it can be executed with:

In [None]:
endpoint_url = sdk.base_environment_url  + "/endpoints/" + endpoint["name"]

inputs = {"input_data": new_data}
endpoint_token = endpoint["endpoint_token"]

request = requests.post(endpoint_url, headers={"Authorization": f"EndpointToken {endpoint_token}"}, json=inputs)
request.json()

The HTTP code 200 indicates that the request has been taken into account. In case of an error, we can expect an error code starting with 4XX or 5XX.

But, obviously, you can execute it in any other way (curl command in bash, Postman…).

### Re train your model periodically

Let's imagine that our dataset is frequently updated, for instance we get new labeled iris data every day. In this case we might want to retrain our model by triggering our training pipeline `part4-iris-train` every day. The platform can do this automatically using the `periodic` execution rule in our deployment.

A **periodic** deployment allows a pipeline to be executed at a certain schedule. For example, every monday at a certain time, every month, every 5 minutes etc.

The inputs and outputs have to be defined, with a constant value or a mapping to the data store.

First we will update our trainIris function so that it produces a file output containing our model, that we will then map to the datastore. Check the updated version of this function in `src/part-4-iris-predict.py`

We can thet create the pipeline as we are used to.

In [None]:
train_output = Output(
    name="model",
    data_type="file"
)

sdk.create_pipeline(
    pipeline_name="part-4-iristrain",
    function_path="src/part-4-iris-predict.py",
    function_name="trainIris",
    container_config={
        "local_folder": "../../get_started",
        "requirements_path": "requirements.txt",
        },
    outputs=[train_output],
)



Now let's create a deployment that executes our pipeline every 5 minutes. In our case, we will map the prediction output (which is our only I/O) to the datastore on the same path that is used in the pediction endpoint deployment. This way our prediction pipeline will automatically use the last version of our model for predictions.

Note that the schedule argument takes the CRON syntax (examples here: https://crontab.guru/).

In [None]:
output_mapping_periodic = OutputDestination(
    pipeline_output_name="model",
    datastore_path="get_started/models/iris_knn_model.joblib"
)

In [None]:
periodic = sdk.create_deployment(
    execution_rule="periodic",
	pipeline_name="part-4-iristrain",
	deployment_name="part-4-iristrain",
    schedule="*/5 * * * *",
    outputs_mapping=[output_mapping_periodic],
    
)

Our training pipeline will now be executed every 5 minutes, updating our model with the potential new data. The predict pipeline will then use this updated model automatically.

You can check that you actually have a new execution every 5 minutes using the sdk or via the web interface.