# Minimal Ride Hailing Example

![Feast Data Flow](./images/data-flow.png)

## Introduction

For this quick start, we will:
1. Register two driver features, one for driver statistics, the other for driver trips. Driver statistics are updated on daily basis, whereas driver trips are updated in real time.
2. Creates a driver dataset, then use Feast SDK to retrieve the features corresponding to these drivers from an offline store.
3. Store the features in an online store (Redis), and retrieve the features via Feast SDK.

## Features Registry (Feast Core)

### Configuration

Configurations can be provided in three different methods:

In [None]:
# Using environmental variables
# os.environ["FEAST_CORE_URL"] = "core:6565"
# os.environ["FEAST_SERVING_URL"] = "online_serving:6566"

# Provide a map during client initialization
# options = {
#     "FEAST_CORE_URL": "core:6565",
#     "FEAST_SERVING_URL": "online_serving:6566", 
# }
# client = Client(options)

# As keyword arguments, without the `FEAST` prefix
# client = Client(core_url="core:6565", serving_url="online_serving:6566")

If you are following the quick start guide, all required configurations to follow the remainder of the tutorial should have been setup, in the form of environmental variables, as showned below. The configuration values may differ depending on the environment. For a full list of configurable values and explanation, please refer to the user guide.

In [1]:
import os
from pprint import pprint
pprint({key: value for key, value in os.environ.items() if key.startswith("FEAST_")})

{'FEAST_CORE_URL': 'core:6565',
 'FEAST_HISTORICAL_FEATURE_OUTPUT_FORMAT': 'parquet',
 'FEAST_HISTORICAL_FEATURE_OUTPUT_LOCATION': 'file:///tmp/historical_feature_output',
 'FEAST_REDIS_HOST': 'redis',
 'FEAST_SERVING_URL': 'online_serving:6566',
 'FEAST_SPARK_EXTRA_OPTIONS': '--jars '
                              'https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop2-latest.jar '
                              '--conf '
                              'spark.hadoop.fs.gs.impl=com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem',
 'FEAST_SPARK_HOME': '/usr/local/spark',
 'FEAST_SPARK_LAUNCHER': 'standalone',
 'FEAST_SPARK_STAGING_LOCATION': 'file:///tmp/staging',
 'FEAST_SPARK_STANDALONE_MASTER': 'local'}


### Basic Imports and Feast Client initialization

In [2]:
import os

from feast import Client, Feature, Entity, ValueType, FeatureTable
from feast.data_source import FileSource, KafkaSource
from feast.data_format import ParquetFormat, AvroFormat

In [3]:
client = Client()

### Declare Features and Entities

Entity defines the primary key(s) associated with one or more feature tables. The entity must be registered before declaring the associated feature tables. 

In [4]:
driver_id = Entity(name="driver_id", description="Driver identifier", value_type=ValueType.INT64)

In [5]:
# Daily updated features 
acc_rate = Feature("acc_rate", ValueType.FLOAT)
conv_rate = Feature("conv_rate", ValueType.FLOAT)
avg_daily_trips = Feature("avg_daily_trips", ValueType.INT32)

# Real-time updated features
trips_today = Feature("trips_today", ValueType.INT32)

```python
FeatureTable(
    name = "driver_statistics",
    entities = ["driver_id"],
    features = [
        acc_rate,
        conv_rate,
        avg_daily_trips
    ]
    ...
)
```


```python
FeatureTable(
    name = "driver_trips",
    entities = ["driver_id"],
    features = [
        trips_today
    ]
    ...
)

```

![Features Join](./images/features-join.png)

```python
FeatureTable(
    ...,
    batch_source=FileSource(  # Required
        file_format=ParquetFormat(),
        file_url="gs://feast-demo-data-lake",
        ...
    ),
    stream_source=KafkaSource(  # Optional
        bootstrap_servers="...",
        topic="driver_trips",
        ...
    )
```

Feature tables group the features together and describe how they can be retrieved. The following examples assume that the feature tables are stored on the local file system, and is accessible from the Spark cluster. If you have setup a GCP service account, you may use GCS instead as the file source.

`batch_source` defines where the historical features are stored. It is also possible to have an optional `stream_source`, which the feature values are delivered continuously.

For now we will define only `batch_source` for both `driver_statistics` and `driver_trips`, and demonstrate the usage of `stream_source` in later part of the tutorial.

In [None]:
# This is the location we're using for the offline feature store.

import os
demo_data_location = os.path.join(os.getenv("FEAST_SPARK_STAGING_LOCATION", "file:///home/jovyan/"), "test_data")


In [6]:
driver_statistics_source_uri = os.path.join(demo_data_location, "driver_statistics")

driver_statistics = FeatureTable(
    name = "driver_statistics",
    entities = ["driver_id"],
    features = [
        acc_rate,
        conv_rate,
        avg_daily_trips
    ],
    batch_source=FileSource(
        event_timestamp_column="datetime",
        created_timestamp_column="created",
        file_format=ParquetFormat(),
        file_url=driver_statistics_source_uri,
        date_partition_column="date"
    )
)

In [7]:
driver_trips_source_uri = os.path.join(demo_data_location, "driver_trips")


driver_trips = FeatureTable(
    name = "driver_trips",
    entities = ["driver_id"],
    features = [
        trips_today
    ],
    batch_source=FileSource(
        event_timestamp_column="datetime",
        created_timestamp_column="created",
        file_format=ParquetFormat(),
        file_url=driver_trips_source_uri,
        date_partition_column="date"
    )
)

### Registering entities and feature tables in Feast Core

In [8]:
client.apply_entity(driver_id)
client.apply_feature_table(driver_statistics)
client.apply_feature_table(driver_trips)

In [9]:
print(client.get_feature_table("driver_statistics").to_yaml())
print(client.get_feature_table("driver_trips").to_yaml())

spec:
  name: driver_statistics
  entities:
  - driver_id
  features:
  - name: conv_rate
    valueType: FLOAT
  - name: avg_daily_trips
    valueType: INT32
  - name: acc_rate
    valueType: FLOAT
  batchSource:
    type: BATCH_FILE
    eventTimestampColumn: datetime
    datePartitionColumn: date
    createdTimestampColumn: created
    fileOptions:
      fileFormat:
        parquetFormat: {}
      fileUrl: file:///home/jovyan/driver_statistics
meta:
  createdTimestamp: '2020-10-22T02:15:16Z'

spec:
  name: driver_trips
  entities:
  - driver_id
  features:
  - name: trips_today
    valueType: INT32
  batchSource:
    type: BATCH_FILE
    eventTimestampColumn: datetime
    datePartitionColumn: date
    createdTimestampColumn: created
    fileOptions:
      fileFormat:
        parquetFormat: {}
      fileUrl: file:///home/jovyan/driver_trips
meta:
  createdTimestamp: '2020-10-22T02:15:16Z'



### Populating batch source

Feast is agnostic to how the batch source is populated, as long as it complies to the Feature Table specification. Therefore, any existing ETL tools can be used for the purpose of data ingestion. Alternatively, you can also use Feast SDK to ingest a Panda Dataframe to the batch source.

In [10]:
import pandas as pd
import numpy as np
from datetime import datetime

In [11]:
def generate_entities():
    return np.random.choice(999999, size=100, replace=False)

In [12]:
def generate_trips(entities):
    df = pd.DataFrame(columns=["driver_id", "trips_today", "datetime", "created"])
    df['driver_id'] = entities
    df['trips_today'] = np.random.randint(0, 1000, size=100).astype(np.int32)
    df['datetime'] = pd.to_datetime(
            np.random.randint(
                datetime(2020, 10, 10).timestamp(),
                datetime(2020, 10, 20).timestamp(),
                size=100),
        unit="s"
    )
    df['created'] = pd.to_datetime(datetime.now())
    return df
    

In [13]:
def generate_stats(entities):
    df = pd.DataFrame(columns=["driver_id", "conv_rate", "acc_rate", "avg_daily_trips", "datetime", "created"])
    df['driver_id'] = entities
    df['conv_rate'] = np.random.random(size=100).astype(np.float32)
    df['acc_rate'] = np.random.random(size=100).astype(np.float32)
    df['avg_daily_trips'] = np.random.randint(0, 1000, size=100).astype(np.int32)
    df['datetime'] = pd.to_datetime(
            np.random.randint(
                datetime(2020, 10, 10).timestamp(),
                datetime(2020, 10, 20).timestamp(),
                size=100),
        unit="s"
    )
    df['created'] = pd.to_datetime(datetime.now())
    return df

In [14]:
entities = generate_entities()
stats_df = generate_stats(entities)
trips_df = generate_trips(entities)

In [15]:
client.ingest(driver_statistics, stats_df)
client.ingest(driver_trips, trips_df)

Removing temporary file(s)...
Data has been successfully ingested into FeatureTable batch source.
Removing temporary file(s)...
Data has been successfully ingested into FeatureTable batch source.


## Historical Retrieval For Training

### Point-in-time correction

![Point In Time](./images/pit-2.png)

Feast joins the features to the entities based on the following conditions:
1. Entity primary key(s) value matches.
2. Feature event timestamp is the closest match possible to the entity event timestamp,
   but must not be more recent than the entity event timestamp, and the difference must
   not be greater than the maximum age specified in the feature table, unless the maximum age is not specified.
3. If more than one feature table rows satisfy condition 1 and 2, feature row with the
   most recent created timestamp will be chosen.
4. If none of the above conditions are satisfied, the feature rows will have null values.

In [16]:
import gcsfs
from pyarrow.parquet import ParquetDataset
from urllib.parse import urlparse

In [17]:
def read_parquet(uri):
    parsed_uri = urlparse(uri)
    if parsed_uri.scheme == "file":
        return pd.read_parquet(parsed_uri.path)
    elif parsed_uri.scheme == "gs":
        fs = gcsfs.GCSFileSystem()
        files = ["gs://" + path for path in fs.glob(uri + '/part-*')]
        ds = ParquetDataset(files, filesystem=fs)
        return ds.read().to_pandas()
    elif parsed_uri.scheme == 's3':
        import s3fs
        fs = s3fs.S3FileSystem()
        files = ["s3://" + path for path in fs.glob(uri + '/part-*')]
        ds = ParquetDataset(files, filesystem=fs)
        return ds.read().to_pandas()
    else:
        raise ValueError(f"Unsupported URL scheme {uri}")

In [18]:
entities_with_timestamp = pd.DataFrame(columns=['driver_id', 'event_timestamp'])
entities_with_timestamp['driver_id'] = np.random.choice(entities, 10, replace=False)
entities_with_timestamp['event_timestamp'] = pd.to_datetime(np.random.randint(
    datetime(2020, 10, 18).timestamp(),
    datetime(2020, 10, 20).timestamp(),
    size=10), unit='s')
entities_with_timestamp

Unnamed: 0,driver_id,event_timestamp
0,153811,2020-10-19 15:21:20
1,195236,2020-10-18 19:07:53
2,673969,2020-10-18 20:06:02
3,832049,2020-10-18 00:10:00
4,85798,2020-10-18 22:09:50
5,809726,2020-10-19 00:19:19
6,468897,2020-10-18 19:15:25
7,309579,2020-10-19 11:31:34
8,619146,2020-10-19 02:00:57
9,161665,2020-10-18 02:56:24


In [19]:
# get_historical_features will return immediately once the Spark job has been submitted succesfully.
job = client.get_historical_features(
    feature_refs=[
        "driver_statistics:avg_daily_trips",
        "driver_statistics:conv_rate",
        "driver_statistics:acc_rate",
        "driver_trips:trips_today"
    ], 
    entity_source=entities_with_timestamp
)

In [20]:
# get_output_file_uri will block until the Spark job is completed.
output_file_uri = job.get_output_file_uri()

In [21]:
read_parquet(output_file_uri)

Unnamed: 0,driver_id,event_timestamp,driver_statistics__conv_rate,driver_statistics__avg_daily_trips,driver_statistics__acc_rate,driver_trips__trips_today
0,619146,2020-10-19 02:00:57,0.772141,789,0.343955,954.0
1,153811,2020-10-19 15:21:20,0.755364,993,0.758154,632.0
2,809726,2020-10-19 00:19:19,0.80659,899,0.017062,414.0
3,85798,2020-10-18 22:09:50,0.564337,485,0.960244,778.0
4,832049,2020-10-18 00:10:00,0.335728,275,0.125462,837.0
5,195236,2020-10-18 19:07:53,0.941593,228,0.494385,719.0
6,161665,2020-10-18 02:56:24,0.844367,505,0.996184,868.0
7,468897,2020-10-18 19:15:25,0.814076,136,0.763832,539.0
8,673969,2020-10-18 20:06:02,0.493867,327,0.363619,
9,309579,2020-10-19 11:31:34,0.229143,979,0.619767,867.0


The retrieved result can now be used for model training.

## Populating Online Storage with Batch Ingestion

In order to populate the online storage, we can use Feast SDK to start a Spark batch job which will extract the features from the batch source, then load the features to an online store.

In [22]:
job = client.start_offline_to_online_ingestion(
    driver_statistics,
    datetime(2020, 10, 10),
    datetime(2020, 10, 20)
)

In [23]:
# It will take some time before the Spark Job is completed
job.get_status()

<SparkJobStatus.COMPLETED: 3>

Once the job is completed, the SDK can be used to retrieve the result from the online store.

In [24]:
entities_sample = np.random.choice(entities, 10, replace=False)
entities_sample = [{"driver_id": e} for e in entities_sample]
entities_sample

[{'driver_id': 100651},
 {'driver_id': 561506},
 {'driver_id': 304334},
 {'driver_id': 997026},
 {'driver_id': 420223},
 {'driver_id': 546179},
 {'driver_id': 308686},
 {'driver_id': 888397},
 {'driver_id': 731657},
 {'driver_id': 684814}]

In [25]:
features = client.get_online_features(
    feature_refs=["driver_statistics:avg_daily_trips"],
    entity_rows=entities_sample).to_dict()
features

{'driver_id': [100651,
  561506,
  304334,
  997026,
  420223,
  546179,
  308686,
  888397,
  731657,
  684814],
 'driver_statistics:avg_daily_trips': [614,
  71,
  801,
  341,
  158,
  747,
  509,
  568,
  844,
  315]}

In [26]:
pd.DataFrame(features)

Unnamed: 0,driver_id,driver_statistics:avg_daily_trips
0,100651,614
1,561506,71
2,304334,801
3,997026,341
4,420223,158
5,546179,747
6,308686,509
7,888397,568
8,731657,844
9,684814,315


The features can now be used as an input to the trained model.

## Ingestion from Streaming (real-time) Source

With a streaming source, we can use Feast SDK to launch a Spark streaming job that continuously update the online store. First, we will update `driver_trips` feature table such that a new streaming source is added.

In [None]:
!pip install confluent_kafka

In [27]:
import json
import pytz
import io
import avro.schema
from avro.io import BinaryEncoder, DatumWriter
from confluent_kafka import Producer

In [28]:
# Change this to any Kafka broker addresses which is accessible by the spark cluster
KAFKA_BROKER = os.getenv("DEMO_KAFKA_BROKERS", "kafka:9092")

In [29]:
avro_schema_json = json.dumps({
    "type": "record",
    "name": "DriverTrips",
    "fields": [
        {"name": "driver_id", "type": "long"},
        {"name": "trips_today", "type": "int"},
        {
            "name": "datetime",
            "type": {"type": "long", "logicalType": "timestamp-micros"},
        },
    ],
})

In [30]:
driver_trips.stream_source = KafkaSource(
    event_timestamp_column="datetime",
    created_timestamp_column="datetime",
    bootstrap_servers=KAFKA_BROKER,
    topic="driver_trips",
    message_format=AvroFormat(avro_schema_json)
)
client.apply_feature_table(driver_trips)

Start the streaming job and send avro record to Kafka:

In [31]:
job = client.start_stream_to_online_ingestion(
    driver_trips
)

In [32]:
def send_avro_record_to_kafka(topic, record):
    value_schema = avro.schema.parse(avro_schema_json)
    writer = DatumWriter(value_schema)
    bytes_writer = io.BytesIO()
    encoder = BinaryEncoder(bytes_writer)
    writer.write(record, encoder)
    
    producer = Producer({
        "bootstrap.servers": KAFKA_BROKER,
    })
    producer.produce(topic=topic, value=bytes_writer.getvalue())
    producer.flush()

In [33]:
for record in trips_df.drop(columns=['created']).to_dict('record'):
    record["datetime"] = (
        record["datetime"].to_pydatetime().replace(tzinfo=pytz.utc)
    )

    send_avro_record_to_kafka(topic="driver_trips", record=record)

### Retrieving joined features from several feature tables

In [34]:
entities_sample = np.random.choice(entities, 10, replace=False)
entities_sample = [{"driver_id": e} for e in entities_sample]
entities_sample

[{'driver_id': 684814},
 {'driver_id': 60009},
 {'driver_id': 94674},
 {'driver_id': 977538},
 {'driver_id': 861519},
 {'driver_id': 85798},
 {'driver_id': 594463},
 {'driver_id': 132034},
 {'driver_id': 268644},
 {'driver_id': 369157}]

In [35]:
features = client.get_online_features(
    feature_refs=["driver_statistics:avg_daily_trips", "driver_trips:trips_today"],
    entity_rows=entities_sample).to_dict()

In [36]:
pd.DataFrame(features)

Unnamed: 0,driver_id,driver_statistics:avg_daily_trips,driver_trips:trips_today
0,684814,315,
1,60009,266,
2,94674,80,
3,977538,685,
4,861519,140,
5,85798,485,
6,594463,315,
7,132034,188,
8,268644,849,
9,369157,129,


In [37]:
# This will stop the streaming job
job.cancel()