# Module 3: Scheduling batch transformations with dbt, Airflow, and Feast

## 1. Overview
In this notebook, we see how to use dbt to automatically run batch transformations with Airflow, and run materialization once dbt has run its incremental model

<img src="../architecture.png" width="750"/>

# 2. Setup the feature store

In [1]:
%%capture
%env SNOWFLAKE_DEPLOYMENT_URL="[YOUR DEPLOYMENT]"
%env SNOWFLAKE_USER="[YOUR USER]"
%env SNOWFLAKE_PASSWORD="[YOUR PASSWORD]"
%env SNOWFLAKE_ROLE="[YOUR ROLE]"
%env SNOWFLAKE_WAREHOUSE="[YOUR WAREHOUSE]"
%env SNOWFLAKE_DATABASE="[YOUR DATABASE]"
%env USAGE=False

In [9]:
from feast import FeatureStore
from datetime import datetime
import warnings
warnings.filterwarnings("ignore", category=DeprecationWarning) 

store = FeatureStore(repo_path=".")

### Fetch training data from offline store
Just to verify the features are in the batch sources.

In [4]:
entity_sql = f"""
    SELECT
        NAMEORIG as USER_ID,
        TIMESTAMP as "event_timestamp"
    FROM {store.get_data_source("transactions_source").get_table_query_string()}
    WHERE TIMESTAMP BETWEEN '2021-07-14 00:00:00+0' and '2021-07-16 00:00:00+0'
"""
training_df = store.get_historical_features(
    entity_df=entity_sql,
    features=store.get_feature_service("model_v2"),
).to_df()
print(training_df.head(20))

        USER_ID            event_timestamp CREDIT_SCORE  7D_AVG_AMT
0   C1316679623 2021-07-14 02:21:31.267597          692    2454.810
1   C1817064970 2021-07-14 10:47:53.598600          692  232973.540
2   C1137064844 2021-07-14 02:24:54.111738          662  203089.480
3    C277832456 2021-07-15 23:36:52.611674          624  322265.510
4   C1629780233 2021-07-15 07:50:16.902199          578   81683.840
5   C1880755277 2021-07-15 00:36:09.758792          626   73723.490
6    C596660112 2021-07-14 10:48:42.592094          652   55033.350
7   C1622805037 2021-07-15 16:04:14.752046          653  261867.150
8   C1721218418 2021-07-15 07:47:29.340135          643  212377.195
9    C573703815 2021-07-15 10:52:38.382194          647  166923.465
10  C1700064043 2021-07-15 06:10:45.897233          672  184942.440
11  C1693799911 2021-07-15 06:11:17.415039          638   25347.370
12  C1258929790 2021-07-15 10:50:22.805955          641  175895.690
13   C917254266 2021-07-14 10:13:42.264576      

## Materialize batch features & fetch online features from Redis
We didn't materialize the full set of data with Airflow to save time / money. Now we selectively materialize so we can fetch the right online data.

In [10]:
!feast materialize "2021-07-14 00:00:00+0" "2021-07-16 00:00:00+0"

  from requests.packages.urllib3.contrib.pyopenssl \
Materializing [1m[32m2[0m feature views from [1m[32m2021-07-13 17:00:00-07:00[0m to [1m[32m2021-07-15 17:00:00-07:00[0m into the [1m[32mredis[0m online store.

[1m[32maggregate_transactions_features[0m:
100%|██████████████████████████████████████████████████████| 54991/54991 [00:01<00:00, 28759.14it/s]
[1m[32mcredit_scores_features[0m:
100%|████████████████████████████████████████████████████| 654482/654482 [00:22<00:00, 29090.35it/s]


#### SDK based online retrieval
Now we can retrieve these materialized features from Redis by directly using the SDK. This is one of the most popular ways to retrieve features with Feast since it allows you to integrate with an existing service (e.g. a Flask) that also handles model inference or pre/post-processing

In [11]:
features = store.get_online_features(
    features=store.get_feature_service("model_v2"),
    entity_rows=[
        {
            "USER_ID": "C1783349759",
        }
    ],
).to_dict()

def print_online_features(features):
    for key, value in sorted(features.items()):
        print(key, " : ", value)

print_online_features(features)

7D_AVG_AMT  :  [58164.1796875]
CREDIT_SCORE  :  [659]
USER_ID  :  ['C1783349759']


#### HTTP based online retrieval
We can also retrieve from a deployed feature server. We had previously deployed this with Docker Compose (see [docker-compose.yml](../docker-compose.yml))

This can be preferable for many reasons. If you want to build an in-memory cache, caching on a central feature server can allow more effective caching across teams. You can also more centrally manage rate-limiting / access control, upgrade Feast versions independently, etc.

In [13]:
import requests
import json

online_request = {
  "feature_service": "model_v2",
  "entities": {
    "USER_ID": ["C1783349759"]
  }
}
r = requests.post('http://localhost:6566/get-online-features', data=json.dumps(online_request))
print(json.dumps(r.json(), indent=4, sort_keys=True))

{
    "metadata": {
        "feature_names": [
            "USER_ID",
            "CREDIT_SCORE",
            "7D_AVG_AMT"
        ]
    },
    "results": [
        {
            "event_timestamps": [
                "1970-01-01T00:00:00Z"
            ],
            "statuses": [
                "PRESENT"
            ],
            "values": [
                "C1783349759"
            ]
        },
        {
            "event_timestamps": [
                "2021-07-14T00:55:21Z"
            ],
            "statuses": [
                "PRESENT"
            ],
            "values": [
                659
            ]
        },
        {
            "event_timestamps": [
                "2021-07-14T06:00:58Z"
            ],
            "statuses": [
                "PRESENT"
            ],
            "values": [
                58164.1796875
            ]
        }
    ]
}


## 5. Generating fresher features via stream transformations
Now we push streaming features into Feast by ingesting events from Kafka and processing with Spark Structured Streaming.
- These features can then be further post-processed and combined with other features or request data in on demand transforms.
- An example might be to push in the last 5 transactions, and in on demand transforms generate the average of those transactions.

Feast will help manage both batch and streaming sources for you. You can run `feast materialize-incremental` as well as ingest streaming features to the same online store.

### 5a. Connect to Kafka from Spark


In [14]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, IntegerType, DoubleType, TimestampType

import pandas as pd
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = "--packages=org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0 pyspark-shell"

spark = SparkSession.builder.master("local").appName("feast-spark").getOrCreate()
# Reduce partitions since default is 200 which will be slow on a local machine
spark.conf.set("spark.sql.shuffle.partitions", 5)

schema = (
    "STRUCT<"
        "amount: DOUBLE, "
        "isFlaggedFraud: BIGINT, "
        "isFraud: BIGINT, "
        "nameDest: STRING, "
        "nameOrig: STRING, "
        "timestamp: TIMESTAMP, "
        "type_CASH_IN: BIGINT, "
        "type_CASH_OUT: BIGINT, "
        "type_DEBIT: BIGINT, "
        "type_PAYMENT: BIGINT, "
        "type_TRANSFER: BIGINT"
    ">"
)

# Subscribe to 1 topic, with headers
df = (
    spark.readStream.format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("subscribe", "transactions")
    .option("startingOffsets", "earliest")
    .load()
    .selectExpr('CAST(value AS STRING)')
    .select(from_json('value', schema).alias("temp"))
    .select("temp.*")
)

:: loading settings :: url = jar:file:/Users/dannychiao/.pyenv/versions/3.10.3/envs/python-3.10/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/dannychiao/.ivy2/cache
The jars for the packages stored in: /Users/dannychiao/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-e0d857e0-2b24-428c-a443-15ba8d6c2920;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.0.0 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.0.0 in central
	found org.apache.kafka#kafka-clients;2.4.1 in central
	found com.github.luben#zstd-jni;1.4.4-3 in central
	found org.lz4#lz4-java;1.7.1 in local-m2-cache
	found org.xerial.snappy#snappy-java;1.1.7.5 in central
	found org.slf4j#slf4j-api;1.7.30 in local-m2-cache
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.commons#commons-pool2;2.6.2 in central
:: resolution report :: resolve 333ms :: artifacts dl 12ms
	:: modules in use:
	com.github.luben#zstd-jni;1.4.4-3 from central in [default]
	org.apache.commons#commons-

22/10/24 23:34:02 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [15]:
import pandas as pd
def send_to_feast(df, epoch):
    pandas_df: pd.DataFrame = df.toPandas()
    if pandas_df.empty:
        return
    
    if "TIMESTAMP" in pandas_df:
        print("processing window")
        # Filter out only for the latest window for the user_id
        pandas_df = pandas_df.sort_values(by=["USER_ID","TIMESTAMP"], ascending=False).groupby("USER_ID").nth(-1)
        store.push("transactions_7d", pandas_df)
        print(pandas_df)
    print(f"Num rows: {len(pandas_df.index)}")

seven_day_avg = (
    df.withWatermark("timestamp", "1 second") 
        .groupBy("nameOrig", window(timeColumn="timestamp", windowDuration="7 day", slideDuration="1 hour"))
        .agg(
            avg("amount").alias("7D_AVG_AMT")
        )
        .select(col("nameOrig").alias("USER_ID"), col("window.end").alias("TIMESTAMP"), "7D_AVG_AMT")
)

query_1 = (
    seven_day_avg
        .writeStream
        .outputMode("append") 
        .option("checkpointLocation", "/tmp/feast-workshop/q3/")
        .trigger(processingTime="30 seconds")
        .foreachBatch(send_to_feast)
        .start()
)

query_1.awaitTermination(timeout=30)
query_1.stop()

22/10/24 23:34:07 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
22/10/24 23:34:12 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.




22/10/24 23:34:17 WARN HDFSBackedStateStoreProvider: The state for version 5 doesn't exist in loadedMaps. Reading snapshot file and delta files if needed...Note that this is normal for the first batch of starting query.
22/10/24 23:34:17 WARN HDFSBackedStateStoreProvider: The state for version 5 doesn't exist in loadedMaps. Reading snapshot file and delta files if needed...Note that this is normal for the first batch of starting query.




22/10/24 23:34:17 WARN HDFSBackedStateStoreProvider: The state for version 5 doesn't exist in loadedMaps. Reading snapshot file and delta files if needed...Note that this is normal for the first batch of starting query.




22/10/24 23:34:18 WARN HDFSBackedStateStoreProvider: The state for version 5 doesn't exist in loadedMaps. Reading snapshot file and delta files if needed...Note that this is normal for the first batch of starting query.
22/10/24 23:34:18 WARN HDFSBackedStateStoreProvider: The state for version 5 doesn't exist in loadedMaps. Reading snapshot file and delta files if needed...Note that this is normal for the first batch of starting query.


                                                                                

processing window
                      TIMESTAMP     7D_AVG_AMT
USER_ID                                       
C1002549077 2024-09-08 06:00:00  492373.865000
C1017359561 2024-09-08 06:00:00  167760.910000
C1029224019 2024-09-08 06:00:00  170918.001000
C103805766  2024-09-08 06:00:00  167167.800000
C1054169081 2024-09-08 06:00:00  259062.940000
...                         ...            ...
C975618271  2024-09-08 06:00:00   93361.601429
C976047598  2024-09-08 06:00:00   12879.980000
C980973980  2024-09-08 06:00:00    2703.550000
C988211716  2024-09-08 06:00:00  114029.610000
C989890503  2024-09-08 06:00:00  186932.563333

[259 rows x 2 columns]
Num rows: 259


                                                                                

processing window
                      TIMESTAMP     7D_AVG_AMT
USER_ID                                       
C1002549077 2024-09-08 07:00:00  451469.744286
C1017359561 2024-09-08 07:00:00  167760.910000
C1029224019 2024-09-08 07:00:00  170918.001000
C103805766  2024-09-08 07:00:00  167167.800000
C1054169081 2024-09-08 07:00:00  259062.940000
...                         ...            ...
C975618271  2024-09-08 07:00:00   93361.601429
C976047598  2024-09-08 07:00:00   12879.980000
C980973980  2024-09-08 07:00:00    2703.550000
C988211716  2024-09-08 07:00:00  114029.610000
C989890503  2024-09-08 07:00:00  186932.563333

[266 rows x 2 columns]
Num rows: 266


In [17]:
features_new = store.get_online_features(
    features=store.get_feature_service("model_v2"),
    entity_rows=[
        {
            "USER_ID": "C1783349759",
        }
    ],
).to_dict()

print("=== FEATURES_OLD ===")
print_online_features(features)
print("=== FEATURES_NEW ===")
print_online_features(features_new)

=== FEATURES_OLD ===
7D_AVG_AMT  :  [58164.1796875]
CREDIT_SCORE  :  [659]
USER_ID  :  ['C1783349759']
=== FEATURES_NEW ===
7D_AVG_AMT  :  [105369.8125]
CREDIT_SCORE  :  [659]
USER_ID  :  ['C1783349759']


In [18]:
import shutil

dir_path = '/tmp/feast-workshop/'

try:
    shutil.rmtree(dir_path)
except OSError as e:
    print("Error: %s : %s" % (dir_path, e.strerror))