# Core 7.8 Feature Store - Ingestion Methods

In this section, we will take a look at the various ways to ingest data including ad-hoc, batch job, and real-time.

---

### References

Much of the following content is derived from the official documenation:
- [Feature Store: Data ingestion](https://docs.mlrun.org/en/stable/feature-store/feature-store-data-ingestion.html)

---

### What is an ingestion method?

Up until this point in the `Feature Store` module, all of our data ingestion tasks have been in a notebook. But what if you want to ingest data via a scheduled job? Or in real-time? This is where the different ingestion methods come into play.

---

### What ingestion methods are supported?

At this time, there are three supported ingestion methods:
- `Ad-hoc`: In a notebook or script (like we have done up until this point)
- `Job`: In a dedicated Kubernetes pod - can be scheduled or started remotely
- `Real-Time`: In a real-time endpoint from a source like a Kafka stream or HTTP request

All of these ingestion methods are supported by each of the three `engines`. Additionally, each ingestion method supports `transformations` and `aggregations`.

---

### Setup

In [1]:
import pandas as pd
import mlrun
import mlrun.feature_store as fstore
from mlrun.datastore.sources import DataFrameSource, CSVSource, HttpSource
import requests

project = mlrun.get_or_create_project("iguazio-academy", context="./")

data = pd.read_csv("data/heart_disease_categorical.csv")

data.head()

> 2022-04-29 19:53:34,612 [info] loaded project iguazio-academy from MLRun DB


Unnamed: 0,patient_id,age,sex,cp,exang,fbs,slope,thal
0,e443544b-8d9e-4f6c-9623-e24b6139aae0,52,male,typical_angina,no,False,downsloping,normal
1,8227d3df-16ab-4452-8ea5-99472362d982,53,male,typical_angina,yes,True,upsloping,normal
2,10c4b4ba-ab40-44de-8aba-6bdb062192c4,70,male,typical_angina,yes,False,upsloping,normal
3,f0acdc22-7ee6-4817-a671-e136211bc0a6,61,male,typical_angina,no,False,downsloping,normal
4,2d6b3bca-4841-4618-9a8c-ca902010b009,62,female,typical_angina,no,True,flat,reversable_defect


Each of the ingestion methods will use a `Feature Set` with a computation graph, `transformations`, and `aggregations`. For use in each ingestion method, we will first create a `Feature Set`:

In [2]:
heart_set = fstore.FeatureSet(
    name="heart-disease-all",
    entities=[fstore.Entity("patient_id")],
    description="Heart disease data all columns",
    engine="storey"
)

---

### Ad-Hoc Ingestion

This is what we have been doing up until this point. For the sake of completion, an example of ad-hoc ingestion is as follows:

In [3]:
fstore.ingest(featureset=heart_set, source=DataFrameSource(df=data))

Converting input from bool to <class 'numpy.uint8'> for compatibility.


Unnamed: 0_level_0,age,sex,cp,exang,fbs,slope,thal
patient_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
e443544b-8d9e-4f6c-9623-e24b6139aae0,52,male,typical_angina,no,False,downsloping,normal
8227d3df-16ab-4452-8ea5-99472362d982,53,male,typical_angina,yes,True,upsloping,normal
10c4b4ba-ab40-44de-8aba-6bdb062192c4,70,male,typical_angina,yes,False,upsloping,normal
f0acdc22-7ee6-4817-a671-e136211bc0a6,61,male,typical_angina,no,False,downsloping,normal
2d6b3bca-4841-4618-9a8c-ca902010b009,62,female,typical_angina,no,True,flat,reversable_defect
...,...,...,...,...,...,...,...
5d2fc80f-ed64-4e1c-9c95-3baace09118b,59,male,atypical_angina,yes,False,downsloping,reversable_defect
01548a7e-0f68-4308-80de-cd93fdbfb903,60,male,typical_angina,yes,False,flat,normal
f8c97cc1-8a3a-4b8e-965c-58e75c2379e6,47,male,typical_angina,yes,False,flat,reversable_defect
d7fc9e01-b792-44da-88fa-a0057527da3f,50,female,typical_angina,no,False,downsloping,reversable_defect


---

### Job Ingestion

Unlike `ad-hoc` ingestion, job ingestion will create a dedicated Kubernetes pod with specified resources to perform the ingestion.

The basic mechanism for creating an ingestion job is to create a `RunConfig` that describes things like the Docker image to use and whether or not to mount the local filesystem. An example is as follows:

In [4]:
config = fstore.RunConfig(image='mlrun/mlrun', kind="job").apply(mlrun.auto_mount())

Then, you would ingest as normal with the addition of the `run_config` parameter. Note that you cannot use the `DataFrameSource` as the Kubernetes pod will not share the same execution environment as your script or notebook.

In [5]:
csv_path = "/User/igz_repos/iguazio-academy/modules/core/7_feature_store/data/heart_disease_categorical.csv"

In [6]:
fstore.ingest(featureset=heart_set, source=CSVSource(path=csv_path), run_config=config)

> 2022-04-29 19:53:52,973 [info] starting run heart-disease-all-ingest uid=a57298f28ad04f1a8058cf1291c3d794 DB=http://mlrun-api:8080
> 2022-04-29 19:53:53,150 [info] Job is running in the background, pod: heart-disease-all-ingest-x24j5
> 2022-04-29 19:53:59,873 [info] starting ingestion task to store://feature-sets/iguazio-academy/heart-disease-all:latest.
> 2022-04-29 19:54:03,322 [info] ingestion task completed, targets:
> 2022-04-29 19:54:03,322 [info] [{'name': 'parquet', 'kind': 'parquet', 'path': 'v3io:///projects/iguazio-academy/FeatureStore/heart-disease-all/parquet/sets/heart-disease-all-latest', 'status': 'created', 'updated': '2022-04-29T19:54:01.179905+00:00', 'last_written': datetime.datetime(2022, 4, 29, 19, 54, 1, 207429)}, {'name': 'nosql', 'kind': 'nosql', 'path': 'v3io:///projects/iguazio-academy/FeatureStore/heart-disease-all/nosql/sets/heart-disease-all-latest', 'status': 'created', 'updated': '2022-04-29T19:54:01.187824+00:00'}]
> 2022-04-29 19:54:03,428 [info] run

project,uid,iter,start,state,name,labels,inputs,parameters,results,artifacts
iguazio-academy,...91c3d794,0,Apr 29 19:53:59,completed,heart-disease-all-ingest,job-type=feature-ingestfeature-set=store://feature-sets/iguazio-academy/heart-disease-allv3io_user=nickkind=jobowner=nickmlrun/client_version=0.10.0host=heart-disease-all-ingest-x24j5,,"infer_options=63overwrite=Nonefeatureset=store://feature-sets/iguazio-academy/heart-disease-allsource={'kind': 'csv', 'name': '', 'path': '/User/igz_repos/iguazio-academy/modules/core/7_feature_store/data/heart_disease_categorical.csv'}targets=None",featureset=store://feature-sets/iguazio-academy/heart-disease-all:latest,





> 2022-04-29 19:54:05,798 [info] run executed, status=completed


<mlrun.model.RunObject at 0x7f6f390e3650>

You can also add a schedule to the source to perform regular ingestion jobs. This can be done like the following:

In [7]:
fstore.ingest(featureset=heart_set, source=CSVSource(path=csv_path, schedule="0 * * * *"), run_config=config)

> 2022-04-29 19:54:09,538 [info] starting run heart-disease-all-ingest uid=fed688d02ffc4b6ab349e7d2b81068e5 DB=http://mlrun-api:8080
> 2022-04-29 19:54:09,736 [info] task scheduled, {'schedule': '0 * * * *', 'project': 'iguazio-academy', 'name': 'heart-disease-all-ingest'}


---

### Real-Time Ingestion

The last (and possibly most exciting) ingestion method is real-time. This will deploy an endpoint where incoming records will be ingested in to the Feature Store in real-time from sources such as Kafka or HTTP.

The basic mechanism for creating an ingestion endpoint is to create a `RunConfig` that describes things like the Docker image. An example is as follows:

In [8]:
config = fstore.RunConfig(image='mlrun/mlrun', kind="serving")

Now, we use the config to deploy a real-time ingestion service with an HTTP source:

In [9]:
url = fstore.deploy_ingestion_service(featureset=heart_set, source=HttpSource(key_field="patient_id"), run_config=config)

> 2022-04-29 19:54:12,383 [info] Starting remote function deploy
2022-04-29 19:54:12  (info) Deploying function
2022-04-29 19:54:12  (info) Building
2022-04-29 19:54:12  (info) Staging files and preparing base images
2022-04-29 19:54:12  (info) Building processor image
2022-04-29 19:54:14  (info) Build complete
2022-04-29 19:54:19  (info) Function deploy complete
> 2022-04-29 19:54:20,178 [info] successfully deployed function: {'internal_invocation_urls': ['nuclio-iguazio-academy-heart-disease-all-ingest.default-tenant.svc.cluster.local:8080'], 'external_invocation_urls': ['iguazio-academy-heart-disease-all-ingest-iguazio-academy.default-tenant.app.us-sales-322.iguazio-cd1.com/']}


Then, you can make an HTTP POST request to the endpoint with the record to ingest:

In [10]:
record = data.to_dict(orient="records")[0]
record

{'patient_id': 'e443544b-8d9e-4f6c-9623-e24b6139aae0',
 'age': 52,
 'sex': 'male',
 'cp': 'typical_angina',
 'exang': 'no',
 'fbs': False,
 'slope': 'downsloping',
 'thal': 'normal'}

In [11]:
resp = requests.post(url=url, json=record)
resp.json()

{'id': '068692da-96d8-4006-a9f3-e4bfb47c0c74'}

---