# Building streaming features with Feast, Spark, and Kafka

## 1. Overview

This notebook explores how data scientists and engineers can build streaming features in Feast.
It builds off of the first [module](https://github.com/feast-dev/feast-workshop/tree/main/module_1) from the Feast workshop.
The workshop module is not a prerequisite, but is recommended if you are not already familiar with the Push API in Feast.

Streaming features are a powerful tool to decrease training-serving skew.
If built correctly, models in production can use extremely fresh features, often yielding dramatic performance improvements.
However, there are many challenges around building streaming features.
For example, how does a data scientist define streaming features?
How does the data scientist connect the stream data source with the appropriate batch data source?
What if there are transformations involved?
Who is responsible for keeping the streaming features alive in production?
How do they ensure that the streaming features in production are consistent with the features available for training in an offline setting?
Feast can help solve these problems.

All the necessary resources to run this notebook are in `kafka_spark_demo`.
These include a sample feature repo and Dockerfiles.

### Table of Contents

A typical pattern that we see is that data scientists use Feast to define, use, and share features, which greatly improves their productivity, while ML engineers use Feast to ensure that all the features are available for both training and serving.
This notebook is organized as follows.
* Sections 2 and 3 do some basic setup.
* Sections 4 and 5 show how a data scientist can define and use stream features in Feast.
* Section 6 then shows how an ML engineer can use the Push API to ensure that those streaming features are available in production.

All the capabilities discussed here are currently available in the latest version of Feast (0.22.1).
The only capability that requires custom logic is ingesting features from a stream data source (in section 6), which would typically be handled by an ML platform team.
We have written sample code to use Spark Structured Streaming to ingest features from a Kafka data source, and the approach we used can be modified to work for other stream processing engines and stream data sources.

## 2. Set up feature store, Kafka, and Redis

### Apply feature repository
We first run `feast apply` to register our data sources and features.

In [1]:
!feast apply

Created entity [1m[32mdriver[0m
Created entity [1m[32mcustomer[0m
Created feature view [1m[32mcustomer_stats[0m
Created stream feature view [1m[32mdriver_hourly_stats_stream[0m

Deploying infrastructure for [1m[32mcustomer_stats[0m


### Spin up Kafka + Redis

We then use Docker Compose to spin up Kafka and Redis. Kafka simulates the streaming infrastructure that provides data for the `driver_hourly_stats_stream` feature view, and Redis is used for the online store.

We can spin these services up by running `docker-compose up` from the `kafka_spark_demo` directory. This leverages a script (in `kafka_demo/`) that creates a topic, reads from `feature_repo/data/driver_stats_stream.parquet`, generates newer timestamps, and emits them to the topic. It also deploys an instance of Redis.

```
$ docker-compose up
Creating network "kafka_spark_demo_default" with the default driver
Creating redis     ... done
Creating zookeeper ... done
Creating broker    ... done
Creating kafka_events ... done
Attaching to zookeeper, redis, broker, kafka_events
```

## 3. Explore existing feature views

Let's assume the role of a data scientist who wants to train a model to determine which customers and drivers should be matched together. We start by exploring the existing feature views, as another data scientist might have already defined a useful feature view.

If we inspect `features.py`, we'll see that a `customer_stats_view` feature view already exists. Let's inspect the feature values.

In [2]:
from feast import FeatureStore

store = FeatureStore(repo_path=".")

### Fetch training data from offline store

In [3]:
import pandas as pd
from datetime import datetime

entity_df = pd.DataFrame.from_dict(
    {
        "customer_id": [5001, 5002, 5003, 5004, 5001],
        "event_timestamp": [
            datetime(2021, 4, 12, 10, 59, 42),
            datetime(2021, 4, 12, 8, 12, 10),
            datetime(2021, 4, 12, 16, 40, 26),
            datetime(2021, 4, 12, 15, 1, 12),
            datetime.now()
        ]
    }
)
training_df = store.get_historical_features(
    entity_df=entity_df,
    features=[
        "customer_stats:current_balance",
        "customer_stats:avg_passenger_count",
        "customer_stats:lifetime_trip_count",
    ],
).to_df()
print(training_df.head())

   customer_id                  event_timestamp  current_balance  \
0         5001        2021-04-12 10:59:42+00:00         0.174109   
1         5003        2021-04-12 16:40:26+00:00         0.735872   
2         5004        2021-04-12 15:01:12+00:00         0.885541   
3         5002        2021-04-12 08:12:10+00:00         0.922777   
4         5001 2022-06-27 15:19:26.431431+00:00         0.544859   

   avg_passenger_count  lifetime_trip_count  
0             0.384933                   14  
1             0.542926                  616  
2             0.774241                  129  
3             0.167704                  844  
4             0.087846                  240  




## 4. Create stream feature view

We're satisfied with the `customer_stats` feature view, as it will provide all the necessary features for a customer. But we still needs features for drivers. Moreover, the streaming team has told us that there is actually a Kafka stream that can provide extremely fresh features for drivers. In order to take advantage of those fresh features, we just need to define a `StreamFeatureView` instead of a standard `FeatureView`, and pass that Kafka source to the `StreamFeatureView`. For convenience, the feature repo already contains the `KafkaSource` and `StreamFeatureView`.

Let's take a closer look.

In [4]:
!cat data_sources.py

from datetime import timedelta

from feast import (
    FileSource,
    KafkaSource,
)
from feast.data_format import JsonFormat

# Feast also supports pulling data from data warehouses like BigQuery, Snowflake, Redshift and data lakes (e.g. via Redshift Spectrum, Trino, Spark)
customer_stats_batch_source = FileSource(
    name="customer_stats_source",
    path="data/customer_stats.parquet",
    timestamp_field="event_timestamp",
    created_timestamp_column="created",
    description="A table describing the stats of a customer based on daily logs",
    owner="test1@gmail.com",
)

driver_stats_batch_source = FileSource(
    name="driver_stats_source",
    path="data/driver_stats.parquet",
    timestamp_field="event_timestamp",
    created_timestamp_column="created",
    description="A table describing the stats of a driver based on hourly logs",
    owner="test2@gmail.com",
)

driver_stats_stream_source = KafkaSource(
    name="driver_stats_stream",
    kafk

As a data scientist, we don't need to know too much about this Kafka stream. All we need are the bootstrap servers, the topic, and the schema to define the `KafkaSource`. We can rely on our ML platform team to ensure that the stream is up and running in production (which will be covered by a later section in this notebook). Note that we also specify a `batch_source` in the definition of this `KafkaSource`, which is the `driver_stats_batch_source` object defined above. This batch source is where our training data lives. When we want historical data to train our model, we will retrieve it from the batch source. Moreover, our ML platform team will also ensure that any streaming data that is available in production will eventually make its way into the batch source.

Now let's take a closer look at the stream feature view.

In [5]:
!cat features.py

from datetime import timedelta
from pyspark.sql import DataFrame

from feast import (
    FeatureView,
    Field,
)
from feast.stream_feature_view import stream_feature_view
from feast.types import Float32, Int32

from data_sources import *
from entities import *

customer_stats_view = FeatureView(
    name="customer_stats",
    description="customer features",
    entities=[customer],
    ttl=timedelta(seconds=8640000000),
    schema=[
        Field(name="current_balance", dtype=Float32),
        Field(name="avg_passenger_count", dtype=Float32),
        Field(name="lifetime_trip_count", dtype=Int32),
    ],
    online=True,
    source=customer_stats_batch_source,
    tags={"production": "True"},
    owner="test1@gmail.com",
)

@stream_feature_view(
    entities=[driver],
    ttl=timedelta(seconds=8640000000),
    mode="spark",
    schema=[
        Field(name="conv_percentage", dtype=Float32),
        Field(name="acc_percentage", dtype=Float32),
    

A stream feature view requires mostly the same parameters as a normal feature view. One new capability with stream feature views is the ability to define an associated transformation. In this case, we can see that the transformation is a pyspark udf that will transform the rates into percentages by multiplying them by 100. This might be used in a situation where the features in the stream are in decimal format, e.g. `0.5211`, but we want to use percentages, e.g. `52.11`. Since we added a transformation, we also specified `mode=spark`, which indicates that we will use pyspark to process the stream.

Let's also inspect the historical features for the stream feature view.

In [6]:
entity_df = pd.DataFrame.from_dict(
    {
        "driver_id": [1001, 1002, 1003, 1004, 1001],
        "event_timestamp": [
            datetime(2021, 4, 12, 10, 59, 42),
            datetime(2021, 4, 12, 8, 12, 10),
            datetime(2021, 4, 12, 16, 40, 26),
            datetime(2021, 4, 12, 15, 1, 12),
            datetime.now()
        ]
    }
)
training_df = store.get_historical_features(
    entity_df=entity_df,
    features=[
        "driver_hourly_stats_stream:conv_percentage",
        "driver_hourly_stats_stream:acc_percentage",
    ],
).to_df()
print(training_df.head())

   driver_id                  event_timestamp  conv_percentage  acc_percentage
0       1001        2021-04-12 10:59:42+00:00        52.114902       75.165855
1       1003        2021-04-12 16:40:26+00:00        18.885477       34.473606
2       1004        2021-04-12 15:01:12+00:00        29.649216       93.530525
3       1002        2021-04-12 08:12:10+00:00         8.901370       21.263689
4       1001 2022-06-27 15:19:27.924174+00:00        40.458847       40.757076




This is exactly what we need to train our model!

## 5. Productionize stream feature view

We can materialize the features from our batch source into the online store to use them in production. The features will not be very fresh as we are not taking advantage of the Kafka stream, but we will handle this in the next section.

In [7]:
!feast materialize-incremental $(date +%Y-%m-%d)

Materializing [1m[32m2[0m feature views to [1m[32m2022-06-26 17:00:00-07:00[0m into the [1m[32mredis[0m online store.

[1m[32mcustomer_stats[0m from [1m[32m1748-09-11 22:19:35-07:52:58[0m to [1m[32m2022-06-26 17:00:00-07:00[0m:
100%|████████████████████████████████████████████████████████████████| 5/5 [00:00<00:00, 213.68it/s]
[1m[32mdriver_hourly_stats_stream[0m from [1m[32m1748-09-11 22:19:36-07:52:58[0m to [1m[32m2022-06-26 17:00:00-07:00[0m:
100%|████████████████████████████████████████████████████████████████| 5/5 [00:00<00:00, 670.60it/s]


Let's confirm that the features have been materialized correctly.

In [8]:
features = store.get_online_features(
    features=[
        "driver_hourly_stats_stream:conv_percentage",
        "driver_hourly_stats_stream:acc_percentage",
    ],
    entity_rows=[
        {
            "driver_id": 1001,
        }
    ],
).to_dict(include_event_timestamps=True)

def print_online_features(features):
    for key, value in sorted(features.items()):
        print(key, " : ", value)

print_online_features(features)

acc_percentage  :  [40.757076263427734]
acc_percentage__ts  :  [1647266400]
conv_percentage  :  [40.45884704589844]
conv_percentage__ts  :  [1647266400]
driver_id  :  [1001]
driver_id__ts  :  [0]




## 6. Ingesting and transforming data from a Kafka topic

Now let's switch to the perspective of an ML engineer. We see that a data scientist has just registered a stream feature view, and we are now responsible for ensuring that fresh features from the stream are available in production.

We will use Spark Structured Streaming to ingest from a Kafka topic, transform the data, and then push the data to the online store. We start by setting up a Spark session, as well as importing several other modules we'll use later.

In [9]:
import os
from pyspark.sql import SparkSession

# See https://spark.apache.org/docs/3.1.2/structured-streaming-kafka-integration.html#deploying for notes on why we need this environment variable.
os.environ['PYSPARK_SUBMIT_ARGS'] = "--packages=org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0 pyspark-shell"
spark = SparkSession.builder.master("local").appName("feast-spark").getOrCreate()
spark.conf.set("spark.sql.shuffle.partitions", 5)



:: loading settings :: url = jar:file:/Users/felixwang/feast/env/lib/python3.8/site-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/felixwang/.ivy2/cache
The jars for the packages stored in: /Users/felixwang/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-406f197c-467a-49a5-9f61-1bc4c65719db;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.0.0 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.0.0 in central
	found org.apache.kafka#kafka-clients;2.4.1 in central
	found com.github.luben#zstd-jni;1.4.4-3 in central
	found org.lz4#lz4-java;1.7.1 in local-m2-cache
	found org.xerial.snappy#snappy-java;1.1.7.5 in central
	found org.slf4j#slf4j-api;1.7.30 in local-m2-cache
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.commons#commons-pool2;2.6.2 in central
:: resolution report :: resolve 337ms :: artifacts dl 13ms
	:: modules in use:
	com.github.luben#zstd-jni;1.4.4-3 from central in [default]
	org.apache.commons#commons-po

Feast currently does not support launching and orchestrating Spark jobs to read from Kafka; any Feast user that wishes to handle streaming data must write custom logic to manage their own streaming infrastructure.
As an example of what that might look like for Spark and Kafka, please see the sample code (~200 lines) [here](https://github.com/feast-dev/feast/tree/master/sdk/python/feast/infra/contrib).

**The sample code contains all the necessary custom logic necessary to ingest and transform streaming data for Feast.**

It requires only three things.
First, the feature store object, which will push the data into the online store.
Second, the stream feature view whose stream data source will be ingested.
And third, a config object for the stream processing engine of choice.
An optional fourth parameter is a preprocessing function, which allows you to execute custom logic after the transformation has been executed, but before the data has been written into the online store.
Since we are working in a notebook, we will simply use the preprocessing function to print.

Now let's set up each of these items.

In [10]:
from feast.infra.contrib.stream_processor import ProcessorConfig
from feast.infra.contrib.spark_kafka_processor import SparkProcessorConfig
from feast.infra.contrib.stream_processor import get_stream_processor_object

def preprocess_fn(rows: pd.DataFrame):
    print(f"df columns: {rows.columns}")
    print(f"df size: {rows.size}")
    print(f"df preview:\n{rows.head()}")
    return rows

ingestion_config = SparkProcessorConfig(mode="spark", source="kafka", spark_session=spark, processing_time="30 seconds", query_timeout=15)
sfv = store.get_stream_feature_view("driver_hourly_stats_stream")

processor = get_stream_processor_object(
    config=ingestion_config,
    fs=store,
    sfv=sfv,
    preprocess_fn=preprocess_fn,
)

We are finally ready!

We have already materialized features from the offline store and retrieved them, so we expect that after our ingestion job, any features we retrieve will be fresher and have more recent timestamps.

Now we launch the ingestion job.
Let's let it run for a little bit to ensure that the job is indeed pushing data to the online store.
Note that in the configuration above we set the processing time to be 30 seconds, so it might take a little while before we see a non-empty dataframe being ingested.

In [11]:
query = processor.ingest_stream_feature_view()

22/06/27 15:20:02 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
  if LooseVersion(pandas.__version__) < LooseVersion(minimum_pandas_version):


df columns: Index(['event_timestamp', 'created', 'conv_percentage', 'acc_percentage'], dtype='object')
df size: 0
df preview:
Empty DataFrame
Columns: [event_timestamp, created, conv_percentage, acc_percentage]
Index: []


  if LooseVersion(pandas.__version__) < LooseVersion(minimum_pandas_version):


df columns: Index(['event_timestamp', 'created', 'conv_percentage', 'acc_percentage'], dtype='object')
df size: 20
df preview:
              event_timestamp                          created  \
driver_id                                                        
1001      2024-02-25 19:00:00 2022-06-27 22:20:31.158722+00:00   
1002      2024-02-25 19:00:00 2022-06-27 22:20:31.158722+00:00   
1003      2024-02-25 18:00:00 2022-06-27 22:20:31.158722+00:00   
1004      2024-02-25 19:00:00 2022-06-27 22:20:31.158722+00:00   
1005      2024-02-25 19:00:00 2022-06-27 22:20:31.158722+00:00   

           conv_percentage  acc_percentage  
driver_id                                   
1001             67.108572       38.001445  
1002             51.633394       15.392214  
1003             70.622385       46.187967  
1004             90.648526        3.402347  
1005             15.650299       96.641272  


Next we retrieve features.

In [12]:
features = store.get_online_features(
    features=[
        "driver_hourly_stats_stream:conv_percentage",
        "driver_hourly_stats_stream:acc_percentage",
    ],
    entity_rows=[
        {
            "driver_id": 1001,
        }
    ],
).to_dict(include_event_timestamps=True)

def print_online_features(features):
    for key, value in sorted(features.items()):
        print(key, " : ", value)

print_online_features(features)

acc_percentage  :  [38.00144577026367]
acc_percentage__ts  :  [1708887600]
conv_percentage  :  [67.10857391357422]
conv_percentage__ts  :  [1708887600]
driver_id  :  [1001]
driver_id__ts  :  [0]




The new features are more fresh! We can see that the feature values are different and the timestamps are more recent.

Let's stop the query.

In [13]:
query.stop()

### Cleanup
Finally, let's clean up the checkpoint directories from Spark.

In [14]:
import shutil

dir_path = '/tmp/checkpoint'

try:
    shutil.rmtree(dir_path)
except OSError as e:
    print("Error: %s : %s" % (dir_path, e.strerror))

## 7. Closing thoughts and future work

Although this workflow will allow data scientists to take advantage of streaming features, there are still many ways it can be improved. Here are a few things the Feast team is planning to work on in the near future:
* A unified Push API. This will enable ML engineers to push streaming data to the offline store as well as the online store. This capability is already shipped, and will be integrated into this tutorial soon!
* A higher-level DSL for aggregations. This will enable data scientists to express common aggregate features (e.g. mean over the last 24 hours) with a DSL, instead of having to write a complex pyspark UDF on their own.
* The ability to run the stream transformation in batch mode. Stream data sources currently require a batch data source, and assume that the batch data source contains the transformed feature values. However, many teams choose to persist untransformed feature values to batch sources. If stream transformations can also be run in batch mode, teams can use their existing batch data sources.