## Direct online store update with spark
In this section we will follow the paradigm of direct updates to both the online and the offline store from the ETL/transformation pipeline. In this demo we will emit the offline store updates.

1. Use `pyspark` to process the stream
2. Use `spark-redis` to push the feature vectors directly into Redis
3. Use `redis-py` to access the online store data

<img src="./images/diagram1.png" width="800">

Creat a spark session and create a schema to parse incoming messages.

In [24]:
from pyspark import SparkContext
from pyspark.sql.types import *
spark = SparkSession.builder.getOrCreate()

schema = StructType([ 
StructField("event_timestamp", LongType(), False), 
StructField("driver_id", LongType(), False), 
StructField("conv_rate", FloatType(), False), 
StructField("acc_rate", FloatType(), False),
StructField("avg_daily_trips", IntegerType(), False),
StructField("created", LongType(), False),
])

In [26]:
parsed_stream = spark \
    .readStream \
    .schema(schema) \
    .parquet("./data")

Define the stream process. Save only the latest update per each entity on the online store.


<img src="./images/saving_schema.png" width="400">

In [27]:
def process_stream(df, _):
    for driver_id in df.select('driver_id').distinct():
        driver_df = df.filter(df.driver_id == driver_id)
        driver_df = driver_df.filter(driver_df.event_timestamp == driver_df.agg({"event_timestamp": "max"}).collect()[0][0])
        driver_df.write.format("org.apache.spark.sql.redis").option("table", "drivers").option("key.column", "driver_id").save(mode="append")
        # Here you can also write to your offline store
        
query = parsed_stream.writeStream.outputMode("append").foreachBatch(process_stream).start()


22/06/10 22:11:56 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-5b57ec11-9c0c-43a6-8c6c-82517cf34df4. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
22/06/10 22:11:56 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


### Getting features from Redis
After we have uploaded our latest feature values into Redis, we need to pull them out and use them with the application model.
In this demo we will use `redis-py` to create the connection and pull the data.

In [15]:
import redis
import os
r = redis.Redis(host = os.environ['REDIS_HOST'], port = os.environ['REDIS_PORT'], password=os.environ['REDIS_PASSWORD'] )

Define the retrival function. For each entity in the table, fetch its required features.

In [28]:
def get_online_feature(r: redis.Redis, table:str, entities, features):
    pipe = r.pipeline(transaction=False)
    for entity_id in entities:
        pipe.hmget(f'{table}:{entity_id}', *features)
    return pipe.execute()

Check network latency

In [30]:
%%time
r.ping()

CPU times: user 4.75 ms, sys: 2.73 ms, total: 7.48 ms
Wall time: 21.3 ms


True

Retrive features

In [33]:
%%time
get_online_feature(r, "drivers", [1001, 1002, 1003, 1004, 1005], ["avg_daily_trips", "conv_rate", "acc_rate"])


CPU times: user 2.73 ms, sys: 1.61 ms, total: 4.34 ms
Wall time: 20.5 ms


[[b'303', b'0.5645701', b'0.6563286'],
 [b'787', b'0.38299075', b'0.181249'],
 [b'596', b'0.49776912', b'0.25255522'],
 [b'832', b'0.40254834', b'0.20854962'],
 [b'606', b'0.0778933', b'0.66774946']]

### Summary
We saw the usage of `spark-redis` in order to get streaming data into redis and get your online features ready to use. 

Target persona: MLOps engineers/Data engineers/Platform architects.
Consider using this approach if:
1. You are building your own feature store product.
    1. In control of your data path
    1. In control on your data modeling/access patterns/data serilization or compression.


Moreover, you can use `spark-redis` in order to digest Redis streams and use Redis as your stream message borker (not in this demo scope).

<img src="./images/redis-streams.png" width="800">