## Feature Store

When the data grows fast, performing ETL pipelines for multiple machine learning projects becomes expensive since repetitive operations. A feature store is a solution to this problem. It's possible to reuse the features in different projects and don't need to repeat similar processes in other projects.

This tutorial will cover how to create a feature store for Starbucks transactions. We will build ETLs pipelines using Butterfree library to upload data to a Feature Store so that data can be provided for machine learning algorithms, even for training or for prediction.

## Example:
Simulating the following scenario:

- We have a streaming JSON data source with events of starbucks orders being captured in real time.
- We have a csv data set with more information about drinks.


Objective: 

We want to parse the JSON from the streaming source, performing aggregations operations, and store all rows in a cheap structure(like s3) and get more recent transactions on a low latency database like Cassandra.

We desire to have an output with the schema:

- **id_employer**: int
- **name_employer**: string
- **name_client**: string
- **payment**: string
- **timestamp**: timestamp
- **product_name**: timestamp
- **product_size**: string
- **product_price**: int
- **percent_carbo**: float
- **final_price**: float


The following code blocks will show how to generate this feature set using Butterfree library using the above architecture:

- Apache Kafka as data sources (Streaming input data);

- A hive metastore to store metadata (like their schema and location) in a relational database.(For this tutorial we will use Postgresql)
- Apache Cassandra to store more recent data.
- Amazon S3 to store historical features or table views for debug mode.

<img src="arc.png">



<b>Historical Feature Store:</b> all features calculated over time;

<b>Online Feature Store:</b> hot/latest(last record by key) data stored at a low latency data storage(Cassandra).


## Observations

<b>In this tutorial, the historical data will be stored locally. However, you can easily add an s3 bucket. 
    </b>
    
  <b> Check the documentation here https://butterfree.readthedocs.io/en/latest/configuration.html?highlight=s3#historical-feature-store-spark-metastore-and-s3  </b>

<b>We will do a batch process, but you can switch to online processing with minor modifications
    </b>
    
<b> Check the documentation here https://butterfree.readthedocs.io/en/latest/stream.html </b>

### Download packages

In [1]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = "--master local[6] --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0,com.datastax.spark:spark-cassandra-connector_2.12:3.2.0,org.apache.hadoop:hadoop-aws:3.2.2 --conf 'spark.driver.extraJavaOptions=-Dhttp.proxyHost=192.168.5.8 -Dhttp.proxyPort=3128 -Dhttp.nonProxyHosts=localhost|127.0.0.1 -Dhttps.proxyHost=192.168.5.8 -Dhttps.proxyPort=3128 -Dhttps.nonProxyHosts=localhost|127.0.0.1' pyspark-shell"

### Spark Instance

Connecting to hive metastore

In [2]:
# setup spark
from pyspark import SparkContext, SparkConf
from pyspark.sql import session, SparkSession
from pyspark.sql import HiveContext
# butterfree spark client
from butterfree.clients import SparkClient


spark = (
    SparkSession
    .builder
    .appName("Feature Store")
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    .config('spark.hadoop.fs.s3a.aws.credentials.provider', 'org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider')
    .config("spark.hadoop.fs.s3a.endpoint", "http://localhost:9000")
    .config("spark.hadoop.fs.s3a.access.key", "minioadmin")
    .config("spark.hadoop.fs.s3a.secret.key", "minioadmin")
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") 
    .config("spark.hadoop.fs.s3a.path.style.access", "true")
    .config("hive.metastore.uris", "thrift://localhost:9083")
    .enableHiveSupport()
    .getOrCreate())

sc=spark.sparkContext

# client
spark_client = SparkClient()
hive_context = HiveContext(sc)

22/12/28 18:48:00 WARN Utils: Your hostname, ANM-HOANGP46 resolves to a loopback address: 127.0.1.1; using 192.168.6.138 instead (on interface enx000ec6c370bb)
22/12/28 18:48:00 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Ivy Default Cache set to: /home/hoang/.ivy2/cache
The jars for the packages stored in: /home/hoang/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
com.datastax.spark#spark-cassandra-connector_2.12 added as a dependency
org.apache.hadoop#hadoop-aws added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-c2e2a41c-5882-41a4-b0db-743c28ba7e8b;1.0
	confs: [default]


:: loading settings :: url = jar:file:/home/hoang/.local/lib/python3.8/site-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.2.0 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.2.0 in central
	found org.apache.kafka#kafka-clients;2.8.0 in central
	found org.lz4#lz4-java;1.7.1 in central
	found org.xerial.snappy#snappy-java;1.1.8.4 in central
	found org.slf4j#slf4j-api;1.7.30 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.1 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.hadoop#hadoop-client-api;3.3.1 in central
	found org.apache.htrace#htrace-core4;4.1.0-incubating in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.apache.commons#commons-pool2;2.6.2 in central
	found com.datastax.spark#spark-cassandra-connector_2.12;3.2.0 in central
	found com.datastax.spark#spark-cassandra-connector-driver_2.12;3.2.0 in central
	found com.datastax.oss#java-driver-core-shaded;4.13.0 in central
	found com.datastax.oss

In [3]:
!pyspark --version && pip3 freeze | grep pyspark

22/12/28 18:48:04 WARN Utils: Your hostname, ANM-HOANGP46 resolves to a loopback address: 127.0.1.1; using 192.168.6.138 instead (on interface enx000ec6c370bb)
22/12/28 18:48:04 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.2.0
      /_/
                        
Using Scala version 2.12.15, OpenJDK 64-Bit Server VM, 11.0.17
Branch HEAD
Compiled by user ubuntu on 2021-10-06T12:46:30Z
Revision 5d45a415f3a29898d92380380cfd82bfc7f579ea
Url https://github.com/apache/spark
Type --help for more information.
pyspark==3.2.0


### Extract

First, we need to define our data schemas.

In [4]:
from pyspark.sql.types import StringType, IntegerType, StructType, StructField, DoubleType

schema_kafka = StructType([StructField('name_employer', StringType(), True),
                          StructField('id_employer', IntegerType(), True),
                          StructField('name_client', StringType(), True),
                          StructField('transaction_id', IntegerType(), True),
                          StructField('payment', StringType(), True),
                          StructField('timestamp', StringType(), True),
                          StructField('product_name', StringType(), True),
                          StructField('product_size', StringType(), True),
                          StructField('product_price', DoubleType(), True),
                          StructField('percent_discount', IntegerType(), True)])


schema_file = StructType([StructField('name', StringType(), True),
                         StructField('calories', IntegerType(), True),
                         StructField('fat_g', IntegerType(), True),
                         StructField('carb_g', IntegerType(), True),
                         StructField('fiber_g', IntegerType(), True),
                         StructField('protein', IntegerType(), True),
                         StructField('sodium', IntegerType(), True)])

Connecting with cassandra database

In [5]:
from butterfree.extract import Source
from butterfree.extract.readers import FileReader
from butterfree.extract.readers import KafkaReader

kafka_reader = KafkaReader(
    id="events",
    topic="queueing.transactions",
    value_schema=schema_kafka,
    connection_string="localhost:9092",
    stream=False
)

readers = [
    kafka_reader,
    FileReader(id="nutrients", path="data/starbucks-menu-nutrition-drinks.csv", format="csv", schema=schema_file)
]

query = """
select
    *
from
    events
    join nutrients
        on events.product_name = nutrients.name
"""

source = Source(readers=readers, query=query)

In [6]:
source_df = source.construct(spark_client)

                                                                                

In [7]:
# showing that it is a Spark's streaming df
source_df.isStreaming

False

In [8]:
# schema
source_df.printSchema()

root
 |-- name_employer: string (nullable = true)
 |-- id_employer: integer (nullable = true)
 |-- name_client: string (nullable = true)
 |-- transaction_id: integer (nullable = true)
 |-- payment: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- product_size: string (nullable = true)
 |-- product_price: double (nullable = true)
 |-- percent_discount: integer (nullable = true)
 |-- kafka_metadata: struct (nullable = false)
 |    |-- key: string (nullable = true)
 |    |-- topic: string (nullable = true)
 |    |-- value: string (nullable = true)
 |    |-- partition: integer (nullable = true)
 |    |-- offset: long (nullable = true)
 |    |-- timestamp: timestamp (nullable = true)
 |    |-- timestampType: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- calories: integer (nullable = true)
 |-- fat_g: integer (nullable = true)
 |-- carb_g: integer (nullable = true)
 |-- fiber_g: integer (nullable = true)
 

### Transform
- At the transform part, a set of `Feature` objects is declared.
- An Instance of `FeatureSet` is used to hold the features.
- A `FeatureSet` can only be created when it is possible to define a unique tuple formed by key columns and a time reference. This is an **architectural requirement** for the data. So least one `KeyFeature` and one `TimestampFeature` is needed.
- Every `Feature` needs a unique name, a description, and a data-type definition.

### Transform

In [9]:
from pyspark.sql import functions as F

from butterfree.transform import FeatureSet
from butterfree.transform.features import Feature, KeyFeature, TimestampFeature
from butterfree.transform.transformations import SQLExpressionTransform, SparkFunctionTransform, CustomTransform
from butterfree.transform.transformations.h3_transform import H3HashTransform
from butterfree.constants import DataType
from butterfree.transform.utils import Function


def divide(df, parent_feature, column1, column2):
    name = parent_feature.get_output_columns()[0]
    df = df.withColumn(name, F.col(column1) / F.col(column2))
    return df


keys = [
    KeyFeature(
        name="id_employer",
        description="Unique identificator code for employer.",
        from_column="id_employer",
        dtype=DataType.INTEGER,
    )
]

# from_ms = True because the data originally is not in a Timestamp format.
ts_feature = TimestampFeature(from_column="timestamp")

features = [
    Feature(
        name="name_employer",
        description="name_employer",
        dtype=DataType.STRING,
    ),
    Feature(
        name="name_client",
        description="name_client",
        dtype=DataType.STRING,
    ),
    Feature(
        name="product_name",
        description="product_name.",
        dtype=DataType.STRING,
    ),
    Feature(
        name="product_price",
        description="product_price.",
        dtype=DataType.FLOAT,
    ),
    Feature(
        name="payment",
        description="payment.",
        dtype=DataType.STRING,
    ),
    Feature(
        name="calories",
        description="calories",
        dtype=DataType.INTEGER,
    ),
    # custom transformation
    Feature(
           name="percent_carbo",
           description="percent_carbo",
           transformation=CustomTransform(transformer=divide, column1="carb_g", column2="calories"), 
           dtype=DataType.FLOAT,
    ),
    # SQL transformation
    Feature(
           name="final_price",
           description="percent_carbo",
           transformation=SQLExpressionTransform("product_price * ((100 - percent_discount)/100)"), 
           dtype=DataType.FLOAT,
    ),
]

# events will be sotred in our metasotore as a table. You can acess starbucks_order_events 
feature_set = FeatureSet(
    name="starbucks_order_events",
    entity="events",  # entity: to which "business context" this feature set belongs
    description="Features describring events about starbucks store.",
    keys=keys,
    timestamp=ts_feature,
    features=features,
)

In [10]:
feature_set_df = feature_set.construct(source_df, spark_client)

                                                                                

In [11]:
# schema
feature_set_df.printSchema()

root
 |-- id_employer: integer (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- name_employer: string (nullable = true)
 |-- name_client: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- product_price: float (nullable = true)
 |-- payment: string (nullable = true)
 |-- calories: integer (nullable = true)
 |-- percent_carbo: double (nullable = true)
 |-- final_price: double (nullable = true)



In [12]:
feature_set_df.schema

StructType(List(StructField(id_employer,IntegerType,true),StructField(timestamp,TimestampType,true),StructField(name_employer,StringType,true),StructField(name_client,StringType,true),StructField(product_name,StringType,true),StructField(product_price,FloatType,true),StructField(payment,StringType,true),StructField(calories,IntegerType,true),StructField(percent_carbo,DoubleType,true),StructField(final_price,DoubleType,true)))

In [13]:
feature_set_df.toPandas()

Unnamed: 0,id_employer,timestamp,name_employer,name_client,product_name,product_price,payment,calories,percent_carbo,final_price
0,1,2020-08-04 12:04:05,Alex,Aaron Murphy,Caramel Macchiato,4.45,debit,250,0.140000,3.560
1,1,2020-08-02 19:04:05,Alex,Angela Leach,Cinnamon Dolce Latte,3.65,credit,260,0.153846,2.555
2,1,2020-08-02 09:04:05,Alex,Brandon Monroe,Caramel Macchiato,3.75,credit,250,0.140000,2.625
3,1,2020-08-02 21:04:05,Alex,Daniel Garcia,Cinnamon Dolce Latte,4.25,credit,260,0.153846,3.400
4,1,2020-08-03 16:04:05,Alex,Diana Jones,Cinnamon Dolce Latte,4.65,credit,260,0.153846,4.185
...,...,...,...,...,...,...,...,...,...,...
95,0,2020-08-02 13:04:05,Alicia,Richard Dodson,White Chocolate Mocha,4.75,credit,360,0.147222,4.275
96,0,2020-08-04 21:04:05,Alicia,Shane Mccormick,Cinnamon Dolce Latte,3.65,debit,260,0.153846,2.920
97,0,2020-08-04 13:04:05,Alicia,Shirley Wang,White Chocolate Mocha,4.45,credit,360,0.147222,3.115
98,0,2020-08-02 07:04:05,Alicia,Timothy Sanchez,White Chocolate Mocha,4.75,cash,360,0.147222,3.800


### Load

- Using debug mode to create a temporary view with the historical data

In [14]:
from butterfree.load.writers import (
    HistoricalFeatureStoreWriter,
    OnlineFeatureStoreWriter,
)
from butterfree.load import Sink

from butterfree.configs.db import CassandraConfig, MetastoreConfig
from butterfree.load.writers import OnlineFeatureStoreWriter

db_config  = CassandraConfig(
    username="cassandra", 
    password="mysecretpassword",
    host="localhost",
    keyspace="feature_store",
    stream_checkpoint_path="./",
    local_dc='datacenter1'
)

s3_config = MetastoreConfig(
    path="featurestore"
)

# writers = [HistoricalFeatureStoreWriter(debug_mode=True),OnlineFeatureStoreWriter(debug_mode=True)]
# writers = [HistoricalFeatureStoreWriter(debug_mode=True),OnlineFeatureStoreWriter(db_config=db_config)]
writers = [HistoricalFeatureStoreWriter(db_config=s3_config, database="default"),OnlineFeatureStoreWriter(db_config=db_config)]

sink = Sink(writers=writers)

### Cassandra tables

- Lets create a keyspace and a table to store the online features

In [15]:
from cassandra.cluster import Cluster, PlainTextAuthProvider

keyspace = "feature_store"
table_name = "starbucks_order_events"

cassandra_mapping = {
        "TimestampType": "timestamp",
        "BinaryType": "boolean",
        "BooleanType": "boolean",
        "DateType": "timestamp",
        "DecimalType": "decimal",
        "DoubleType": "double",
        "FloatType": "float",
        "IntegerType": "int",
        "LongType": "bigint",
        "StringType": "text",
        "ArrayType(LongType,true)": "frozen<list<bigint>>",
        "ArrayType(StringType,true)": "frozen<list<text>>",
        "ArrayType(FloatType,true)": "frozen<list<float>>",
    }

cluster = Cluster(['127.0.0.1'], auth_provider=PlainTextAuthProvider(username='cassandra', password='mysecretpassword'))
session = cluster.connect()

session.execute("CREATE KEYSPACE IF NOT EXISTS "+ keyspace +" WITH REPLICATION = { 'class' : 'NetworkTopologyStrategy', 'datacenter1' : 1 };")

sql = ", ".join([feature.name +str(" ") + cassandra_mapping[str(feature.dataType)] for feature in feature_set_df.schema]).replace("id_employer int", "id_employer int PRIMARY KEY")
sql = "CREATE TABLE IF NOT EXISTS {}.{} (" + sql + ");"
sql = sql.format(keyspace, table_name)
print(sql)

CREATE TABLE IF NOT EXISTS feature_store.starbucks_order_events (id_employer int PRIMARY KEY, timestamp timestamp, name_employer text, name_client text, product_name text, product_price float, payment text, calories int, percent_carbo double, final_price double);


In [16]:
session.execute(sql)
cluster.shutdown()

### Final Pipeline

In [17]:
from butterfree.pipelines import FeatureSetPipeline

pipeline = FeatureSetPipeline(source=source, feature_set=feature_set, sink=sink)

In [18]:
# asinc run when creating an in memory streaming view for sink 
pipeline.run()

22/12/28 18:48:17 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
22/12/28 18:48:19 WARN SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.
                                                                                

## Showing the results

### Online features for cassandra

In [19]:
print(">>> Online Feature Store  table:")

from cassandra.cluster import Cluster, PlainTextAuthProvider
cluster = Cluster(['127.0.0.1'], auth_provider=PlainTextAuthProvider(username='cassandra', password='mysecretpassword'))
session = cluster.connect()
df = session.execute("SELECT * FROM feature_store.starbucks_order_events")
cluster.shutdown()
# Create data frame
df = spark.createDataFrame(df)
df.toPandas()

>>> Online Feature Store  table:


Unnamed: 0,id_employer,calories,final_price,name_client,name_employer,payment,percent_carbo,product_name,product_price,timestamp
0,5,260,4.185,Monica Wolf,Denver,credit,0.153846,Cinnamon Dolce Latte,4.65,2020-08-05 08:04:05
1,1,260,2.92,Seth Allen,Alex,credit,0.153846,Cinnamon Dolce Latte,3.65,2020-08-05 05:04:05
2,0,260,3.255,Anthony Gonzalez,Alicia,debit,0.153846,Cinnamon Dolce Latte,4.65,2020-08-05 09:04:05
3,2,260,2.555,Antonio Jacobs,Julian,debit,0.153846,Cinnamon Dolce Latte,3.65,2020-08-05 00:04:05
4,4,250,3.115,Alicia Ellis,Mark,credit,0.14,Caramel Macchiato,4.45,2020-08-05 07:04:05
5,6,250,3.115,Stephen Lambert,Luiza,debit,0.14,Caramel Macchiato,4.45,2020-08-05 04:04:05
6,3,360,3.56,Bethany Ryan,Cassandra,debit,0.147222,White Chocolate Mocha,4.45,2020-08-04 23:04:05


## Acessing metastore

### Historical features

In [20]:
hive_context.sql("show tables;").show()

+---------+--------------------+-----------+
|namespace|           tableName|isTemporary|
+---------+--------------------+-----------+
|  default|starbucks_order_e...|      false|
|         |              events|       true|
|         |           nutrients|       true|
+---------+--------------------+-----------+



In [21]:
print(">>> Historical Feature Store:")
hive_context.sql("select * from default.starbucks_order_events where name_employer='Alex';").toPandas()

>>> Historical Feature Store:


Unnamed: 0,id_employer,timestamp,name_employer,name_client,product_name,product_price,payment,calories,percent_carbo,final_price,year,month,day
0,1,2020-08-04 12:04:05,Alex,Aaron Murphy,Caramel Macchiato,4.45,debit,250,0.14,3.56,2020,8,4
1,1,2020-08-04 23:04:05,Alex,Eugene Maddox,White Chocolate Mocha,4.75,credit,360,0.147222,4.275,2020,8,4
2,1,2020-08-04 08:04:05,Alex,Jonathan Bradley,White Chocolate Mocha,3.75,cash,360,0.147222,3.0,2020,8,4
3,1,2020-08-04 05:04:05,Alex,Thomas Pace,White Chocolate Mocha,3.75,debit,360,0.147222,3.0,2020,8,4
4,1,2020-08-04 20:04:05,Alex,Thomas Stein,White Chocolate Mocha,3.75,cash,360,0.147222,3.375,2020,8,4
5,1,2020-08-04 03:04:05,Alex,Troy Fisher,Cinnamon Dolce Latte,3.65,cash,260,0.153846,2.92,2020,8,4
6,1,2020-08-02 19:04:05,Alex,Angela Leach,Cinnamon Dolce Latte,3.65,credit,260,0.153846,2.555,2020,8,2
7,1,2020-08-02 09:04:05,Alex,Brandon Monroe,Caramel Macchiato,3.75,credit,250,0.14,2.625,2020,8,2
8,1,2020-08-02 21:04:05,Alex,Daniel Garcia,Cinnamon Dolce Latte,4.25,credit,260,0.153846,3.4,2020,8,2
9,1,2020-08-02 22:04:05,Alex,Jason Martin,Cinnamon Dolce Latte,3.65,credit,260,0.153846,3.285,2020,8,2
