# Ride Hailing Example

![chart](https://feaststore.blob.core.windows.net/feastjar/FeastArchitectureNew.png)

In [1]:
import io
import json
import os
from datetime import datetime
from urllib.parse import urlparse

import avro.schema
import feast_spark
import numpy as np
import pandas as pd
import pyarrow.parquet as pq
import pytz
from avro.io import BinaryEncoder, DatumWriter
from azure.identity import ClientSecretCredential, DefaultAzureCredential
from azure.storage.filedatalake import DataLakeServiceClient
from confluent_kafka import Producer
from feast import Client, Entity, Feature, FeatureTable, ValueType
from feast.data_format import AvroFormat, ParquetFormat
from feast.data_source import FileSource, KafkaSource
from google.protobuf.duration_pb2 import Duration
from pyarrow.parquet import ParquetDataset

## Introduction

For this demo, we will:

1. Register two driver features, one for driver statistics, the other for driver trips. Driver statistics are updated on daily basis, whereas driver trips are updated in real time.
2. Creates a driver dataset, then use Feast SDK to retrieve the features corresponding to these drivers from an offline store.
3. Store the features in an online store (Redis), and retrieve the features via Feast SDK.

## Features Registry (Feast Core)

### Configuration

Configurations can be provided in three different methods:

In [2]:
# get_historical_features will return immediately once the Spark job has been submitted succesfully.
os.environ["FEAST_SPARK_LAUNCHER"] = "synapse"
os.environ["FEAST_SPARK_HOME"] = "/usr/local/spark"

os.environ["FEAST_azure_synapse_dev_url"] = "https://SYNAPSE-DEV-URL.dev.azuresynapse.net"
os.environ["FEAST_azure_synapse_pool_name"] = "synapsepoolname"

# the datalake dir is the same with this one
os.environ["FEAST_AZURE_SYNAPSE_DATALAKE_DIR"] = "abfss://DEFAULT-SYNAPSE-URL@DEFAULT-SYNAPSE-ADLS-ACCOUNT.dfs.core.windows.net/feast"
os.environ["FEAST_HISTORICAL_FEATURE_OUTPUT_LOCATION"] = "abfss://DEFAULT-SYNAPSE-URL@DEFAULT-SYNAPSE-ADLS-ACCOUNT.dfs.core.windows.net/feast/out"
os.environ["FEAST_SPARK_STAGING_LOCATION"] = "wasbs://feasttest@feaststore.blob.core.windows.net/artifacts/"
os.environ["FEAST_SPARK_INGESTION_JAR"] = "https://feaststore.blob.core.windows.net/feastjar/feast-ingestion-spark-latest.jar"

# Redis Config
os.environ["FEAST_REDIS_HOST"] = "REDIS-HOST-NAME.redis.cache.windows.net"
os.environ["FEAST_REDIS_PORT"] = "6380"
os.environ["FEAST_REDIS_SSL"] = "true"
os.environ["FEAST_REDIS_AUTH"] = "REDIS-KEY"

# EventHub config
os.environ["FEAST_AZURE_EVENTHUB_KAFKA_CONNECTION_STRING"] = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"Endpoint=sb://EVENT-HUB-ENDPOINT.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SHAREDACCESSKEY;EntityPath=driver_trips\";"

os.environ["FEAST_AZURE_BLOB_ACCOUNT_NAME"] = "FEAST-BLOB-STORAGE-ACCOUNT-NAME"
os.environ["FEAST_AZURE_BLOB_ACCOUNT_ACCESS_KEY"] = "FEAST-BLOB-KEY"
os.environ["FEAST_HISTORICAL_FEATURE_OUTPUT_FORMAT"] = "parquet"

# If you are using a client app to sign-in. This is optional
os.environ['AZURE_CLIENT_ID'] = 'AZURE_CLIENT_ID'
os.environ['AZURE_TENANT_ID'] = 'AZURE_TENANT_ID'
os.environ['AZURE_CLIENT_SECRET'] = 'AZURE_CLIENT_SECRET'

import os
from pprint import pprint
pprint({key: value for key, value in os.environ.items() if key.startswith("FEAST_")})

If you are following the quick start guide, all required configurations to follow the remainder of the tutorial should have been setup, in the form of environmental variables, as showned below. The configuration values may differ depending on the environment. For a full list of configurable values and explanation, please refer to the user guide.

### Basic Imports and Feast Client initialization

In [4]:
import os

from feast import Client, Feature, Entity, ValueType, FeatureTable
from feast.data_source import FileSource, KafkaSource
from feast.data_format import ParquetFormat, AvroFormat

In [5]:
client = Client(core_url='CORE-URL:6565', serving_url="SERVING-URL:6566",)
client.set_project("FEAST-PROJECT-NAME")

### Declare Features and Entities

Entity defines the primary key(s) associated with one or more feature tables. The entity must be registered before declaring the associated feature tables. 

In [6]:
driver_id = Entity(name="driver_id", description="Driver identifier", value_type=ValueType.INT64)

In [7]:
# Daily updated features 
acc_rate = Feature("acc_rate", ValueType.FLOAT)
conv_rate = Feature("conv_rate", ValueType.FLOAT)
avg_daily_trips = Feature("avg_daily_trips", ValueType.INT32)

# Real-time updated features
trips_today = Feature("trips_today", ValueType.INT32)

```python
FeatureTable(
    name = "driver_statistics",
    entities = ["driver_id"],
    features = [
        acc_rate,
        conv_rate,
        avg_daily_trips
    ]
    
)
```


```python
FeatureTable(
    name = "driver_trips",
    entities = ["driver_id"],
    features = [
        trips_today
    ]
    
)

```

![Features Join](https://raw.githubusercontent.com/feast-dev/feast-spark-examples/main/minimal/images/features-join.png)

```python
FeatureTable(
    ...,
    batch_source=FileSource(  # Required
        file_format=ParquetFormat(),
        file_url="abfss://feast-demo-data-lake",
        ...
    ),
    stream_source=KafkaSource(  # Optional
        bootstrap_servers="...",
        topic="driver_trips",
        ...
    )
```

Feature tables group the features together and describe how they can be retrieved. The following examples assume that the feature tables are stored on the local file system, and is accessible from the Spark cluster. If you have setup a GCP service account, you may use GCS instead as the file source.

`batch_source` defines where the historical features are stored. It is also possible to have an optional `stream_source`, which the feature values are delivered continuously.

For now we will define only `batch_source` for both `driver_statistics` and `driver_trips`, and demonstrate the usage of `stream_source` in later part of the tutorial.

In [8]:
# This is the location we're using for the offline feature store.

import os
demo_data_location = "wasbs://feasttest@feaststore.blob.core.windows.net/"


In [None]:
driver_statistics_source_uri = os.path.join(demo_data_location, "driver_statistics")

driver_statistics = FeatureTable(
    name = "driver_statistics",
    entities = ["driver_id"],
    features = [
        acc_rate,
        conv_rate,
        avg_daily_trips
    ],
    # You might want to change this interval to see how your feature get joined, because Feast joins your feature based on this
    '''
    Feast joins the features to the entities based on the following conditions:

    1. Entity primary key(s) value matches.
    2. Feature event timestamp is the closest match possible to the entity event timestamp,
    but must not be more recent than the entity event timestamp, and the difference must
    not be greater than the maximum age specified in the feature table, unless the maximum age is not specified.
    3. If more than one feature table rows satisfy condition 1 and 2, feature row with the
    most recent created timestamp will be chosen.
    4. If none of the above conditions are satisfied, the feature rows will have null values.
    '''
    max_age=Duration(seconds=3600*24*365),
    batch_source=FileSource(
        event_timestamp_column="datetime",
        created_timestamp_column="created",
        file_format=ParquetFormat(),
        file_url=driver_statistics_source_uri,
        date_partition_column="date"
    )
)

: 

In [None]:
driver_trips_source_uri = os.path.join(demo_data_location, "driver_trips")


driver_trips = FeatureTable(
    name = "driver_trips",
    entities = ["driver_id"],
    features = [
        trips_today
    ],
    max_age=Duration(seconds=86400 * 1),
    batch_source=FileSource(
        event_timestamp_column="datetime",
        created_timestamp_column="created",
        file_format=ParquetFormat(),
        file_url=driver_trips_source_uri,
        date_partition_column="date"
    )
)

### Registering entities and feature tables in Feast Core

In [11]:
client.apply(driver_id)
client.apply(driver_statistics)
client.apply(driver_trips)

In [12]:
print(client.get_feature_table("driver_statistics").to_yaml())
print(client.get_feature_table("driver_trips").to_yaml())

spec:
  name: driver_statistics
  entities:
  - driver_id
  features:
  - name: avg_daily_trips
    valueType: INT32
  - name: conv_rate
    valueType: FLOAT
  - name: acc_rate
    valueType: FLOAT
  maxAge: 86400s
  batchSource:
    type: BATCH_FILE
    eventTimestampColumn: datetime
    datePartitionColumn: date
    createdTimestampColumn: created
    fileOptions:
      fileFormat:
        parquetFormat: {}
      fileUrl: wasbs://feasttest@feaststore.blob.core.windows.net/driver_statistics
meta:
  createdTimestamp: '2021-10-22T20:10:52Z'

spec:
  name: driver_trips
  entities:
  - driver_id
  features:
  - name: trips_today
    valueType: INT32
  maxAge: 86400s
  batchSource:
    type: BATCH_FILE
    eventTimestampColumn: datetime
    datePartitionColumn: date
    createdTimestampColumn: created
    fileOptions:
      fileFormat:
        parquetFormat: {}
      fileUrl: wasbs://feasttest@feaststore.blob.core.windows.net/driver_trips
meta:
  createdTimestamp: '2021-10-22T20:10:53Z'



### Populating batch source

Feast is agnostic to how the batch source is populated, as long as it complies to the Feature Table specification. Therefore, any existing ETL tools can be used for the purpose of data ingestion. Alternatively, you can also use Feast SDK to ingest a Panda Dataframe to the batch source.

In [13]:
import pandas as pd
import numpy as np
from datetime import datetime

In [14]:
def generate_entities():
    return np.random.choice(999999, size=100, replace=False)

In [15]:
from datetime import datetime,date,timedelta
print()

def generate_trips(entities):
    df = pd.DataFrame(columns=["driver_id", "trips_today", "datetime", "created"])
    df['driver_id'] = entities
    df['trips_today'] = np.random.randint(0, 1000, size=100).astype(np.int32)
    df['datetime'] = pd.to_datetime(
            np.random.randint(
                datetime(2021, 10, 10).timestamp(),
                datetime(2022, 10, 30).timestamp(),
                size=100),
        unit="s"
    )
    df['created'] = pd.to_datetime(datetime.now())
    return df




In [16]:
def generate_stats(entities):
    df = pd.DataFrame(columns=["driver_id", "conv_rate", "acc_rate", "avg_daily_trips", "datetime", "created"])
    df['driver_id'] = entities
    df['conv_rate'] = np.random.random(size=100).astype(np.float32)
    df['acc_rate'] = np.random.random(size=100).astype(np.float32)
    df['avg_daily_trips'] = np.random.randint(0, 1000, size=100).astype(np.int32)
    df['datetime'] = pd.to_datetime(
            np.random.randint(
                datetime(2021, 10, 10).timestamp(),
                datetime(2022, 10, 30).timestamp(),
                size=100),
        unit="s"
    )
    df['created'] = pd.to_datetime(datetime.now())
    return df

In [17]:
entities = generate_entities()
stats_df = generate_stats(entities)
trips_df = generate_trips(entities)

In [18]:
client.ingest(driver_statistics, stats_df)
client.ingest(driver_trips, trips_df)

Removing temporary file(s)...
Data has been successfully ingested into FeatureTable batch source.
Removing temporary file(s)...
Data has been successfully ingested into FeatureTable batch source.


## Historical Retrieval For Training

### Point-in-time correction

![Point In Time](https://raw.githubusercontent.com/feast-dev/feast-spark-examples/main/minimal/images/pit-1.png)

Feast joins the features to the entities based on the following conditions:

1. Entity primary key(s) value matches.
2. Feature event timestamp is the closest match possible to the entity event timestamp,
   but must not be more recent than the entity event timestamp, and the difference must
   not be greater than the maximum age specified in the feature table, unless the maximum age is not specified.
3. If more than one feature table rows satisfy condition 1 and 2, feature row with the
   most recent created timestamp will be chosen.
4. If none of the above conditions are satisfied, the feature rows will have null values.

In [19]:
from pyarrow.parquet import ParquetDataset
from urllib.parse import urlparse

In [20]:
import glob

def read_parquet(uri):
    parsed_uri = urlparse(uri)
    if parsed_uri.scheme == "file":
        return pd.read_parquet(parsed_uri.path)
    elif parsed_uri.scheme == 'wasbs':
        import adlfs
        fs = adlfs.AzureBlobFileSystem(
            account_name=os.getenv('FEAST_AZURE_BLOB_ACCOUNT_NAME'), account_key=os.getenv('FEAST_AZURE_BLOB_ACCOUNT_ACCESS_KEY')
        )
        uripath = parsed_uri.username + parsed_uri.path
        files = fs.glob(uripath + '/part-*')
        ds = ParquetDataset(files, filesystem=fs)
        return ds.read().to_pandas()
    elif parsed_uri.scheme == 'abfss':
        credential = ClientSecretCredential(os.getenv('AZURE_TENANT_ID'), os.getenv('AZURE_CLIENT_ID'), os.getenv('AZURE_CLIENT_SECRET'))
        datalake = parsed_uri.netloc.split('@')
        
        print(parsed_uri.path)
        service_client = DataLakeServiceClient(account_url="https://" + datalake[1], credential=credential)
        file_system_client = service_client.get_file_system_client(datalake[0])

        directory_client = file_system_client.get_directory_client(parsed_uri.path)
        ## returns the paths to all the files in the target director in ADLS
        adls_paths = [file_path.name.split("/")[-1] for file_path in file_system_client.get_paths(path=parsed_uri.path) if not file_path.is_directory][1:]
        ## need to generate list of local paths to write the files to
        local_paths = [os.path.join("/tmp",file_name) for file_name in adls_paths]
        for idx, file_to_write in enumerate(adls_paths):
            try:
                local_file = open(local_paths[idx],'wb')
                file_client = directory_client.get_file_client(file_to_write)
                download = file_client.download_file()
                downloaded_bytes = download.readall()
                local_file.write(downloaded_bytes)
                local_file.close()
            except Exception as e:
                print(e)
        files = glob.glob(os.path.join("/tmp", "*.parquet"))
        ds = ParquetDataset(files)
        return ds.read().to_pandas()
    else:
        raise ValueError(f"Unsupported URL scheme {uri}")

In [21]:
entities_with_timestamp = pd.DataFrame(columns=['driver_id', 'event_timestamp'])
entities_with_timestamp['driver_id'] = np.random.choice(entities, 10, replace=False)
entities_with_timestamp['event_timestamp'] = pd.to_datetime(np.random.randint(
    datetime(2021, 10, 18).timestamp(),
    datetime(2022, 10, 30).timestamp(),
    size=10), unit='s')
entities_with_timestamp

Unnamed: 0,driver_id,event_timestamp
0,62895,2021-10-21 08:02:52
1,333440,2021-10-26 04:24:48
2,848089,2021-10-26 02:16:20
3,471879,2021-10-24 12:15:02
4,233981,2021-10-25 09:37:49
5,908631,2021-10-25 20:54:12
6,847625,2021-10-23 10:42:38
7,158956,2021-10-21 06:35:24
8,176584,2021-10-29 11:51:31
9,563727,2021-10-25 00:37:51


In [22]:
job = feast_spark.Client(client).get_historical_features(
    feature_refs=[
        "driver_statistics:avg_daily_trips",
        "driver_statistics:conv_rate",
        "driver_statistics:acc_rate",
        "driver_trips:trips_today"
    ], 
    entity_source=entities_with_timestamp
)

![Spark Job](https://feaststore.blob.core.windows.net/feastjar/SparkJobSubmission.PNG)


In [23]:
# get_output_file_uri will block until the Spark job is completed.
output_file_uri = job.get_output_file_uri()

In [24]:
read_parquet(output_file_uri)

The retrieved result can now be used for model training.

## Populating Online Storage with Batch Ingestion

In order to populate the online storage, we can use Feast SDK to start a Spark batch job which will extract the features from the batch source, then load the features to an online store.

In [25]:
job = feast_spark.Client(client).start_offline_to_online_ingestion(
    driver_statistics,
    datetime(2021, 10, 10),
    datetime(2022, 10, 30)
)

In [26]:
# It will take some time before the Spark Job is completed
job._wait_for_complete(300)
print(job.get_status())

SparkJobStatus.COMPLETED


Once the job is completed, the SDK can be used to retrieve the result from the online store.

In [27]:
entities_sample = np.random.choice(entities, 10, replace=False)
entities_sample = [{"driver_id": e} for e in entities_sample]
entities_sample

[{'driver_id': 818442},
 {'driver_id': 720806},
 {'driver_id': 497691},
 {'driver_id': 62895},
 {'driver_id': 869039},
 {'driver_id': 211914},
 {'driver_id': 246306},
 {'driver_id': 35554},
 {'driver_id': 452834},
 {'driver_id': 471879}]

In [None]:
features = client.get_online_features(
    feature_refs=["driver_statistics:avg_daily_trips"],
    entity_rows=entities_sample).to_dict()
features

In [None]:
#pd.DataFrame(features)

The features can now be used as an input to the trained model.

## Bonus: Ingestion from Streaming Source - EventHub

With a streaming source, we can use Feast SDK to launch a Spark streaming job that continuously update the online store. First, we will update `driver_trips` feature table such that a new streaming source is added.

In [19]:
import json
import pytz
import io
import avro.schema
from avro.io import BinaryEncoder, DatumWriter
from confluent_kafka import Producer

In [20]:
# Change this to any Kafka broker addresses which is accessible by the spark cluster
KAFKA_BROKER = "EVENTHUB-ADDRESS.servicebus.windows.net:9093"

In [21]:
avro_schema_json = json.dumps({
    "type": "record",
    "name": "DriverTrips",
    "fields": [
        {"name": "driver_id", "type": "long"},
        {"name": "trips_today", "type": "int"},
        {
            "name": "datetime",
            "type": {"type": "long", "logicalType": "timestamp-micros"},
        },
    ],
})

In [22]:
kafka_topic = "driver_trips"
driver_trips.stream_source = KafkaSource(
    event_timestamp_column="datetime",
    created_timestamp_column="datetime",
    bootstrap_servers=KAFKA_BROKER,
    topic=kafka_topic,
    message_format=AvroFormat(avro_schema_json)
)
client.apply(driver_trips)

Start the streaming job and send avro record to EventHub:

In [24]:
extra_jar_list = [
"https://repo1.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.12/3.0.2/spark-sql-kafka-0-10_2.12-3.0.2.jar",
"https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/2.8.0/kafka-clients-2.8.0.jar",
"https://repo1.maven.org/maven2/org/apache/kafka/kafka_2.12/2.8.0/kafka_2.12-2.8.0.jar",
"https://repo1.maven.org/maven2/org/apache/spark/spark-token-provider-kafka-0-10_2.12/3.0.2/spark-token-provider-kafka-0-10_2.12-3.0.2.jar",

]
job = feast_spark.Client(client).start_stream_to_online_ingestion(
    driver_trips,
    extra_jars=extra_jar_list
)

In [None]:
def send_avro_record_to_kafka(topic, record):
    value_schema = avro.schema.parse(avro_schema_json)
    writer = DatumWriter(value_schema)
    bytes_writer = io.BytesIO()
    encoder = BinaryEncoder(bytes_writer)
    writer.write(record, encoder)

    conf = {
        'bootstrap.servers': 'EVENTHUB-ADDRESS.servicebus.windows.net:9093', #replace
        'security.protocol': 'SASL_SSL',
        'ssl.ca.location': '/usr/lib/ssl/certs/ca-certificates.crt',
        'sasl.mechanism': 'PLAIN',
        'sasl.username': '$ConnectionString',
        'sasl.password': 'Endpoint=sb://EVENTHUB-ADDRESS.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=z9obEAyVvD36fZIEvvtNlCRBEDjIrsfNfDAbgDyTbDg=;',          #replace
        'client.id': 'python-example-producer'
    }

    
    producer = Producer({
        **conf
    })
    producer.produce(topic=topic, value=bytes_writer.getvalue())
    producer.flush()

In [None]:
# Note: depending on the Kafka configuration you may need to create the Kafka topic first, like below:
#from confluent_kafka.admin import AdminClient, NewTopic
#admin = AdminClient({'bootstrap.servers': KAFKA_BROKER})
#new_topic = NewTopic('driver_trips', num_partitions=1, replication_factor=3)
#admin.create_topics(new_topic)
for record in trips_df.drop(columns=['created']).to_dict('record'):
    # print("record", record)
    record["datetime"] = (
        record["datetime"].to_pydatetime().replace(tzinfo=pytz.utc)
    )

    # send_avro_record_to_kafka(topic="driver_trips", record=record)
    send_avro_record_to_kafka(topic=kafka_topic, record=record)

### Retrieving joined features from several feature tables

In [None]:
entities_sample = np.random.choice(entities, 10, replace=False)
entities_sample = [{"driver_id": e} for e in entities_sample]
entities_sample

In [None]:
features = client.get_online_features(
    feature_refs=["driver_statistics:avg_daily_trips"],
    entity_rows=entities_sample).to_dict()

In [None]:
pd.DataFrame(features)

In [None]:
# This will stop the streaming job
# job.cancel()