# How to Trigger the Feast Workflow using FlyteRemote

The goal of this notebook is to train a simple [Gaussian Naive Bayes model using sklearn](https://scikit-learn.org/stable/modules/generated/sklearn.naive_bayes.GaussianNB.html) on a modified [Horse-Colic dataset from UCI](https://archive.ics.uci.edu/ml/datasets/Horse+Colic).

The model aims to classify if the lesion of the horse is surgical or not.

### 01. Register the code

The actual workflow code is auto-documented and rendered using sphinx [here](https://docs.flyte.org/projects/cookbook/en/latest/auto/case_studies/feature_engineering/feast_integration/index.html). We've used [Flytekit](https://docs.flyte.org/projects/flytekit/en/latest/) to express the pipeline in pure Python.

You can use [FlyteConsole](https://github.com/flyteorg/flyteconsole) to launch, monitor, and introspect Flyte executions. However here, let's use [flytekit.remote](https://docs.flyte.org/projects/flytekit/en/latest/design/control_plane.html) to interact with the Flyte backend.

In [1]:
from flytekit.remote import FlyteRemote
from flytekit.configuration import Config

# The :py:meth:`~flytekit.configuration.Config.auto` method instantiates a connection to the demo cluster.
remote = FlyteRemote(
    config=Config.for_sandbox(),
    default_project="flytesnacks",
    default_domain="development"
)

  from .autonotebook import tqdm as notebook_tqdm


:py:meth:`~flytekit.remote.remote.FlyteRemote.register_workflow` can be used to register the workflow.

In [2]:
from flytekit.configuration import ImageConfig
from flytekit.configuration import SerializationSettings

from feast_workflow import feast_workflow

wf = remote.register_script(
    feast_workflow,
    image_config=ImageConfig.from_images(
        # "ghcr.io/flyteorg/flytecookbook:feast_integration-latest"
        "ghcr.io/samhita-alla/feast:0.0.33"
    ),
    version="v1",
)



### 02: Launch an execution

#### Retrieve the latest registered version of the pipeline

FlyteRemote provides convenient methods to retrieve version of the pipeline from the remote server.

**NOTE**: It is possible to get a specific version of the workflow and trigger a launch for that, but let's just get the latest.

In [3]:
lp = remote.fetch_launch_plan(name="feast_integration.feast_workflow.feast_workflow")
lp.id.version

'v1'

:meth:`~flytekit.remote.remote.FlyteRemote.execute` can be used to execute a launch plan.

In [4]:
execution = remote.execute(
    lp,
    inputs={"num_features_univariate": 5},
    wait=True
)

### 03. Sync an execution

You can sync an execution to retrieve the workflow's outputs. ``sync_nodes`` is set to True to retrieve the intermediary nodes' outputs as well.

**NOTE**: It is possible to fetch an existing execution or simply retrieve an already commenced execution. Also, if you launch an execution with the same name, Flyte will respect that and not restart a new execution!

In [3]:
from flytekit.models.core.execution import WorkflowExecutionPhase

synced_execution = remote.sync(execution, sync_nodes=True)
print(f"Execution {synced_execution.id.name} is in {WorkflowExecutionPhase.enum_to_string(synced_execution.closure.phase)} phase")

Execution f31ec4e7f856d4d528f1 is in SUCCEEDED phase


### 04. Retrieve the output

Fetch the model and the model prediction.

In [4]:
model = synced_execution.outputs["o0"]
prediction = synced_execution.outputs["o1"]
prediction

/var/folders/6r/9pdkgpkd5nx1t34ndh1f_3q80000gn/T/flyte4jdjgu28/control_plane_metadata/local_flytekit/676810c6c2ab8918c578323d17c45adc/83fe2ce568a95b4e11659a2709bccd4a.npy

**NOTE**: The output model is available locally as a JobLibSerialized file, which can be downloaded and loaded.

In [5]:
model

/var/folders/6r/9pdkgpkd5nx1t34ndh1f_3q80000gn/T/flyte4jdjgu28/control_plane_metadata/local_flytekit/f17f765e7ad7cd32815d76d1c2b7fe20/model.joblib.dat

Fetch the ``repo_config``.

In [6]:
repo_config = synced_execution.node_executions["n0"].outputs["o0"]

### 05. Generate predictions

Re-use the `predict` function from the workflow to generate predictions — Flytekit will automatically manage the IO for you!

#### Load features from the online feature store

In [11]:
import os

from feast_workflow import predict, FEAST_FEATURES, retrieve_online

os.environ["AWS_ENDPOINT"] = os.environ["FEAST_S3_ENDPOINT_URL"] = "http://localhost:30084/"
os.environ["AWS_ACCESS_KEY_ID"] = os.environ["AWS_ACCESS_KEY_ID"] = "minio"
os.environ["AWS_SECRET_ACCESS_KEY"] = os.environ["AWS_SECRET_ACCESS_KEY"] = "miniostorage"

inference_point = retrieve_online(
    repo_config=repo_config,
    online_store=synced_execution.node_executions["n4"].outputs["o0"],
    data_point=533738,
)
inference_point

minio


{"asctime": "2022-09-19 18:32:46,120", "name": "flytekit", "levelname": "ERROR", "message": "Error from command '['aws', 's3', 'cp', 's3://my-s3-bucket/test/18/f31ec4e7f856d4d528f1-n4-0/9237a930d2c15fd2fa9dd87383a4ef3e/online.db', '/var/folders/6r/9pdkgpkd5nx1t34ndh1f_3q80000gn/T/flyte-v4dqxgo0/sandbox/local_flytekit/cb8bec252c77f43a58a7586dba26c9a1/online.db']':\nb'fatal error: An error occurred (403) when calling the HeadObject operation: Forbidden\\n'\n"}
{"asctime": "2022-09-19 18:32:47,614", "name": "flytekit", "levelname": "ERROR", "message": "Error from command '['aws', '--no-sign-request', 's3', 'cp', 's3://my-s3-bucket/test/18/f31ec4e7f856d4d528f1-n4-0/9237a930d2c15fd2fa9dd87383a4ef3e/online.db', '/var/folders/6r/9pdkgpkd5nx1t34ndh1f_3q80000gn/T/flyte-v4dqxgo0/sandbox/local_flytekit/cb8bec252c77f43a58a7586dba26c9a1/online.db']':\nb'fatal error: An error occurred (403) when calling the HeadObject operation: Forbidden\\n'\n"}
{"asctime": "2022-09-19 18:32:47,618", "name": "flyte

FlyteAssertion: Failed to get data from s3://my-s3-bucket/test/18/f31ec4e7f856d4d528f1-n4-0/9237a930d2c15fd2fa9dd87383a4ef3e/online.db to /var/folders/6r/9pdkgpkd5nx1t34ndh1f_3q80000gn/T/flyte-v4dqxgo0/sandbox/local_flytekit/cb8bec252c77f43a58a7586dba26c9a1/online.db (recursive=False).

Original exception: Called process exited with error code: 1.  Stderr dump:

b'fatal error: An error occurred (403) when calling the HeadObject operation: Forbidden\n'

#### Generate a prediction

In [None]:
predict(model_ser=model, features=inference_point)