# Minimal Ride Hailing Example

### Configuration

Restart your Kernel after installing these packages

In [10]:
%pip install protobuf gcsfs feast -U -q

Note: you may need to restart the kernel to use updated packages.


In [1]:
import random 
staging_bucket = f'gs://feast-staging-bucket-{random.randint(1000000, 10000000)}/'
!gsutil mb {staging_bucket}

Creating gs://feast-staging-bucket-4179836/...


### Basic Imports and Feast Client initialization

In [2]:
import os

from feast import Client, Feature, Entity, ValueType, FeatureTable
from feast.data_source import FileSource, KafkaSource
from feast.data_format import ParquetFormat, AvroFormat

In [10]:
client = Client(
    core_url="kf-feast-core.kubeflow.svc:6565",
    serving_url="feast-online-serving.kubeflow.svc:6566",
    spark_launcher="k8s",
    spark_staging_location=staging_bucket,
    spark_k8s_namespace="kubeflow",
    redis_host="kf-feast-redis-headless.kubeflow.svc",
    historical_feature_output_location=f"{staging_bucket}historical",
)

### Declare Features and Entities

In [4]:
driver_id = Entity(name="driver_id", description="Driver identifier", value_type=ValueType.INT64)

In [5]:
# Daily updated features 
acc_rate = Feature("acc_rate", ValueType.FLOAT)
conv_rate = Feature("conv_rate", ValueType.FLOAT)
avg_daily_trips = Feature("avg_daily_trips", ValueType.INT32)

# Real-time updated features
trips_today = Feature("trips_today", ValueType.INT32)

In [6]:
# Offline data will be stored in this location
demo_data_location = os.path.join(os.getenv("FEAST_SPARK_STAGING_LOCATION", staging_bucket), "test_data")
print(demo_data_location)

gs://feast-staging-bucket-4179836/test_data


In [7]:
driver_statistics_source_uri = os.path.join(demo_data_location, "driver_statistics")

driver_statistics = FeatureTable(
    name = "driver_statistics",
    entities = ["driver_id"],
    features = [
        acc_rate,
        conv_rate,
        avg_daily_trips
    ],
    batch_source=FileSource(
        event_timestamp_column="datetime",
        created_timestamp_column="created",
        file_format=ParquetFormat(),
        file_url=driver_statistics_source_uri,
        date_partition_column="date"
    )
)

In [8]:
driver_trips_source_uri = os.path.join(demo_data_location, "driver_trips")


driver_trips = FeatureTable(
    name = "driver_trips",
    entities = ["driver_id"],
    features = [
        trips_today
    ],
    batch_source=FileSource(
        event_timestamp_column="datetime",
        created_timestamp_column="created",
        file_format=ParquetFormat(),
        file_url=driver_trips_source_uri,
        date_partition_column="date"
    )
)

### Registering entities and feature tables in Feast Core

In [11]:
client.apply(driver_id)
client.apply(driver_statistics)
client.apply(driver_trips)

In [12]:
print(client.get_feature_table("driver_statistics").to_yaml())
print(client.get_feature_table("driver_trips").to_yaml())

spec:
  name: driver_statistics
  entities:
  - driver_id
  features:
  - name: avg_daily_trips
    valueType: INT32
  - name: conv_rate
    valueType: FLOAT
  - name: acc_rate
    valueType: FLOAT
  batchSource:
    type: BATCH_FILE
    eventTimestampColumn: datetime
    datePartitionColumn: date
    createdTimestampColumn: created
    fileOptions:
      fileFormat:
        parquetFormat: {}
      fileUrl: gs://feast-staging-bucket-4179836/test_data/driver_statistics
meta:
  createdTimestamp: '2021-02-03T05:50:53Z'

spec:
  name: driver_trips
  entities:
  - driver_id
  features:
  - name: trips_today
    valueType: INT32
  batchSource:
    type: BATCH_FILE
    eventTimestampColumn: datetime
    datePartitionColumn: date
    createdTimestampColumn: created
    fileOptions:
      fileFormat:
        parquetFormat: {}
      fileUrl: gs://feast-staging-bucket-4179836/test_data/driver_trips
meta:
  createdTimestamp: '2021-02-03T05:50:53Z'



### Populating batch source

Feast is agnostic to how the batch source is populated, as long as it complies to the Feature Table specification. Therefore, any existing ETL tools can be used for the purpose of data ingestion. Alternatively, you can also use Feast SDK to ingest a Panda Dataframe to the batch source.

In [13]:
import pandas as pd
import numpy as np
from datetime import datetime

In [14]:
def generate_entities():
    return np.random.choice(999999, size=100, replace=False)

In [15]:
def generate_trips(entities):
    df = pd.DataFrame(columns=["driver_id", "trips_today", "datetime", "created"])
    df['driver_id'] = entities
    df['trips_today'] = np.random.randint(0, 1000, size=100).astype(np.int32)
    df['datetime'] = pd.to_datetime(
            np.random.randint(
                datetime(2020, 10, 10).timestamp(),
                datetime(2020, 10, 20).timestamp(),
                size=100),
        unit="s"
    )
    df['created'] = pd.to_datetime(datetime.now())
    return df
    

In [16]:
def generate_stats(entities):
    df = pd.DataFrame(columns=["driver_id", "conv_rate", "acc_rate", "avg_daily_trips", "datetime", "created"])
    df['driver_id'] = entities
    df['conv_rate'] = np.random.random(size=100).astype(np.float32)
    df['acc_rate'] = np.random.random(size=100).astype(np.float32)
    df['avg_daily_trips'] = np.random.randint(0, 1000, size=100).astype(np.int32)
    df['datetime'] = pd.to_datetime(
            np.random.randint(
                datetime(2020, 10, 10).timestamp(),
                datetime(2020, 10, 20).timestamp(),
                size=100),
        unit="s"
    )
    df['created'] = pd.to_datetime(datetime.now())
    return df

In [17]:
entities = generate_entities()
stats_df = generate_stats(entities)
trips_df = generate_trips(entities)

In [18]:
client.ingest(driver_statistics, stats_df)
client.ingest(driver_trips, trips_df)

Removing temporary file(s)...
Data has been successfully ingested into FeatureTable batch source.
Removing temporary file(s)...
Data has been successfully ingested into FeatureTable batch source.


## Historical Retrieval For Training

Create a training dataset from offline feature tables

In [19]:
import gcsfs
from pyarrow.parquet import ParquetDataset
from urllib.parse import urlparse

In [20]:
entities_with_timestamp = pd.DataFrame(columns=['driver_id', 'event_timestamp'])
entities_with_timestamp['driver_id'] = np.random.choice(entities, 10, replace=False)
entities_with_timestamp['event_timestamp'] = pd.to_datetime(np.random.randint(
    datetime(2020, 10, 18).timestamp(),
    datetime(2020, 10, 20).timestamp(),
    size=10), unit='s')
entities_with_timestamp

Unnamed: 0,driver_id,event_timestamp
0,441896,2020-10-18 06:02:29
1,59256,2020-10-19 06:02:31
2,79041,2020-10-19 00:58:04
3,210520,2020-10-18 00:51:04
4,552755,2020-10-18 19:15:34
5,991178,2020-10-18 07:27:46
6,577396,2020-10-19 10:46:28
7,401906,2020-10-19 10:58:32
8,135710,2020-10-18 08:22:22
9,849882,2020-10-19 07:20:05


In [22]:
# get_historical_features will return immediately once the Spark job has been submitted succesfully.
job = client.get_historical_features(
    feature_refs=[
        "driver_statistics:avg_daily_trips",
        "driver_statistics:conv_rate",
        "driver_statistics:acc_rate",
        "driver_trips:trips_today"
    ], 
    entity_source=entities_with_timestamp
)

In [23]:
# get_output_file_uri will block until the Spark job is completed.
output_file_uri = job.get_output_file_uri()

In [24]:
# Retrieve the remote training dataset

parsed_uri = urlparse(output_file_uri)
fs = gcsfs.GCSFileSystem()
files = ["gs://" + path for path in fs.glob(output_file_uri + '/part-*')]
ds = ParquetDataset(files, filesystem=fs)
ds.read().to_pandas()

Unnamed: 0,driver_id,event_timestamp,driver_statistics__avg_daily_trips,driver_statistics__conv_rate,driver_statistics__acc_rate,driver_trips__trips_today
0,552755,2020-10-18 19:15:34,787,0.601664,0.444756,617.0
1,441896,2020-10-18 06:02:29,493,0.348591,0.910286,405.0
2,849882,2020-10-19 07:20:05,534,0.927652,0.397792,988.0
3,135710,2020-10-18 08:22:22,688,0.477989,0.363104,54.0
4,577396,2020-10-19 10:46:28,772,0.05913,0.06862,121.0
5,79041,2020-10-19 00:58:04,261,0.104633,0.109721,888.0
6,59256,2020-10-19 06:02:31,194,0.170629,0.715083,731.0
7,401906,2020-10-19 10:58:32,289,0.658645,0.838213,854.0
8,210520,2020-10-18 00:51:04,951,0.119565,0.845131,
9,991178,2020-10-18 07:27:46,632,0.699143,0.34541,607.0


The retrieved result can now be used for model training.

## Populating Online Storage with Batch Ingestion

In order to populate the online storage, we can use Feast SDK to start a Spark batch job which will extract the features from the batch source, then load the features to an online store.

In [25]:
job = client.start_offline_to_online_ingestion(
    driver_statistics,
    datetime(2020, 10, 10),
    datetime(2020, 10, 20)
)

In [34]:
# It will take some time before the Spark Job is completed
job.get_status()

<SparkJobStatus.COMPLETED: 3>

Once the job is completed, the SDK can be used to retrieve the result from the online store.

In [35]:
entities_sample = np.random.choice(entities, 10, replace=False)
entities_sample = [{"driver_id": e} for e in entities_sample]
entities_sample

[{'driver_id': 432527},
 {'driver_id': 66847},
 {'driver_id': 648263},
 {'driver_id': 955983},
 {'driver_id': 650942},
 {'driver_id': 638553},
 {'driver_id': 873512},
 {'driver_id': 480137},
 {'driver_id': 811349},
 {'driver_id': 987187}]

In [36]:
features = client.get_online_features(
    feature_refs=["driver_statistics:avg_daily_trips"],
    entity_rows=entities_sample).to_dict()
features

{'driver_statistics:avg_daily_trips': [535,
  916,
  643,
  142,
  317,
  266,
  552,
  949,
  93,
  694],
 'driver_id': [432527,
  66847,
  648263,
  955983,
  650942,
  638553,
  873512,
  480137,
  811349,
  987187]}

In [37]:
pd.DataFrame(features)

Unnamed: 0,driver_statistics:avg_daily_trips,driver_id
0,535,432527
1,916,66847
2,643,648263
3,142,955983
4,317,650942
5,266,638553
6,552,873512
7,949,480137
8,93,811349
9,694,987187
