# Amazon Keyspaces feature store demo

One of the central elements of an ML platform is the online feature store. It allows multiple ML models to retrieve hundreds or thousands of features with low latency, and enables application of AI for real-time use cases. Customers such as Sumup created feature stores on Amazon Keyspaces which processes thousands of transactions every second, with volume spikes during peak hours, and has steady growth that doubles the number of transactions every year. Because of this, the ML platform requires its low-latency feature store to be also highly reliable and scalable.

To train ML models, we need historical data. During this phase, data scientists experiment with different features to test which ones produce the best model. From a platform perspective, we need to support bulk read and write operations. Read latency isn’t critical at this stage because the data is read into training jobs. After the models are trained and moved to production for real-time inference, we have the following requirements for the platform change: we need to support low-latency reads and use only the latest features data.This interactive notebook steps reproduces some of the steps to integrate Amazon Keyspaces as an online feature store to power ML/AI workloads. 



#### First: Intro into Notebooks and the Glue Interactive session. 

To start using your notebook you need to start an AWS Glue Interactive Session. Jupyter Magics are commands that can be run at the beginning of a cell or as a whole cell body. Magics start with % for line-magics and %% for cell-magics. Line-magics such as %region and %connections can be run with multiple magics in a cell, or with code included in the cell body. 
* To run each step click on the cell and hit 'run' button up above
* To stop an execution click on the stop/iterupt kernal button
* To restart glue session, execute the magics %stop_session below.
* If this is your first time proceed to the next step. 


In [None]:
#%help
# The following magic 'stop_session' will stop the glue interactive session to allow you to repeat steps if nessesary.
# You will need to uncomment the statement and hit run button
%stop_session

### Configure Glue for interactive session

Another important consideration is that we write a single feature job to populate both feature stores. Otherwise, SumUp would have to maintain two sets of code or pipelines for each feature creation job. We use AWS Glue but you could also use Amazon EMR to create the features using PySpark DataFrames. The same DataFrame is written to both Delta Lake and Amazon Keyspaces, which eliminates the hurdle of having separate pipelines.

![image](https://d2908q01vomqb2.cloudfront.net/b6692ea5df920cad691c20319a6fffd7a4a766b8/2022/07/06/BDB-1587-image001.png)


The following cell will configure glue, pyspark, spark cassandra connector, and delta lake on s3. Execute this step to set the appropriate variables for spark conf that will connect to Amazon Keysapces for storing features later in this example. 


In [None]:
%glue_version 3.0
%%configure 
{
    "--max_concurrent_runs":"1",
    "--enable-job-insights":"true",
    "--enable-metrics":"true",
    "--enable-spark-ui": "true",
    "--spark-event-logs-path": "s3://800S3BUCKET/spark-logs/",
    "--datalake-formats": "delta",
    "--enable-observability-metrics":"true",
    "--user-jars-first": "true",
    "--extra-jars":"s3://800S3BUCKET/jars/spark-cassandra-connector-assembly_2.12-3.1.0.jar,s3://800S3BUCKET/jars/aws-sigv4-auth-cassandra-java-driver-plugin-4.0.9-shaded.jar,s3://800S3BUCKET/jars/spark-extension_2.12-2.8.0-3.4.jar,s3://800S3BUCKET/jars/amazon-keyspaces-helpers-1.0-SNAPSHOT.jar",
    "--extra-files":"s3://800S3BUCKET/conf/keyspaces-application.conf",
    "--TempDir": "s3://800S3BUCKET/temp",
     "--conf": "spark.sql.extensions=com.datastax.spark.connector.CassandraSparkExtensions,io.delta.sql.DeltaSparkSessionExtension --conf spark.sql.catalog.myCatalog=com.datastax.spark.connector.datasource.CassandraCatalog --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog --conf spark.delta.logStore.class=org.apache.spark.sql.delta.storage.S3SingleDriverLogStore --conf spark.cassandra.connection.config.profile.path=keyspaces-application.conf --conf spark.task.maxFailures=100 --conf spark.cassandra.output.ignoreNulls=true --conf spark.cassandra.output.ignoreNulls=true --conf directJoinSetting=on --conf spark.cassandra.output.concurrent.writes=1 --conf spark.cassandra.output.batch.grouping.key=none --conf spark.cassandra.output.batch.size.rows=10 --conf directJoinSetting=on"
}


#### Connect and show Keyspaces

In the following cell you will create a new glue interactive session. The session will connect to Amazon Keyspaces through the spark-cassandra-connector and display the current keyspaces. In the previous statement we instantiated a spark sql catalog for Keyspaces named myCatalog. Spark SQL uses an ANSI compliant dialect instead of being Hive compliant. The connector will automatically pushdown all valid predicates to Keyspaces. The Datasource will also automatically project select columns from Keyspaces which are specified in query. 


In [None]:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import rand 
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

print("Connected")


spark.sql("SHOW NAMESPACES FROM myCatalog").show()




#### Extract dataset

Delta Lake is an open-source storage layer that supports ACID transactions and is fully compatible with Apache Spark, making it highly performant at bulk read and write operations. You can store Delta Lake tables on Amazon Simple Storage Service (Amazon S3), which makes it a good fit for the offline feature store. Data scientists can use this stack to train models against the offline feature store (Delta Lake). When the trained models are moved to production, we switch to using the online feature store (Amazon Keyspaces), which offers the latest features set, scalable reads, and much lower latency.

In the below example we will first need to read in a sample dataset. You will load the UK energy data into a spark data frame. Update the value of s3_uri to point to the bucket where your data resides. We will also cache the data so spark keeps the dataset in memory while we process it. Finally, the day_date column in the dataset is loaded as a string so we we will create a new column "day_date" of type date so we can use it to calulate features over different time windows. This will help develop additional features to expermiment with. Once executed you will see a new schema with date type.

In [None]:
import pyspark.sql.functions as F

s3_uri = "s3://800S3BUCKET/sample-data/daily_dataset.csv"

print("Loading Data")

df = (
    spark
    .read.format("com.databricks.spark.csv")
    .option("header", "true")
    .option("inferschema", "true")
    .load(s3_uri)
    )

df.cache()

print("Records Read: {0:,}".format( df.count() ))

## modify date column
df = df.withColumn('day_date', F.to_date('day', 'yyyy-MM-dd'))

df.printSchema()

### Create Features 

Selecting relevant features and also engineering new features is key to building high performing models. You will perform count, sum, average and std over different time windows of 30, 60 and 90 days.



In [None]:
from pyspark.sql import Window
from pyspark.sql.functions import when, max, sum, avg, stddev 
from datetime import date
from dateutil.relativedelta import relativedelta
from pyspark.sql.functions import col, lit

window = Window.partitionBy("household_id").orderBy(F.col('day_date').desc())

df = df.withColumn("energy_sum_3months", sum(when(df.day_date \
                                                             >= (date(2014,2,28) - relativedelta(months=3))\
                                                        , col("energy_sum")).otherwise(0))\
                                                  .over(window))

df = df.withColumn("energy_sum_6months", sum(when(df.day_date \
                                                             >= (date(2014,2,28) - relativedelta(months=6))\
                                                        , col("energy_sum")).otherwise(0))\
                                                  .over(window))

df = df.withColumn("energy_sum_1yr", sum(when(df.day_date \
                                                             >= (date(2014,2,28) - relativedelta(months=12))\
                                                        , col("energy_sum")).otherwise(0))\
                                                  .over(window))
#------
# Count
#------

df = df.withColumn("energy_count_3months", sum(when(df.day_date \
                                                             >= (date(2014,2,28) - relativedelta(months=3))\
                                                        , col("energy_count")).otherwise(0))\
                                                  .over(window))

df = df.withColumn("energy_count_6months", sum(when(df.day_date \
                                                             >= (date(2014,2,28) - relativedelta(months=6))\
                                                        , col("energy_count")).otherwise(0))\
                                                  .over(window))

df = df.withColumn("energy_count_1yr", sum(when(df.day_date \
                                                             >= (date(2014,2,28) - relativedelta(months=12))\
                                                        , col("energy_count")).otherwise(0))\
                                                  .over(window))

#------
# Max
#------

df = df.withColumn("energy_max_3months", max(when(df.day_date \
                                                             >= (date(2014,2,28) - relativedelta(months=3))\
                                                        , col("energy_max")).otherwise(0))\
                                                  .over(window))

df = df.withColumn("energy_max_6months", max(when(df.day_date \
                                                             >= (date(2014,2,28) - relativedelta(months=6))\
                                                        , col("energy_max")).otherwise(0))\
                                                  .over(window))

df = df.withColumn("energy_max_1yr", max(when(df.day_date \
                                                             >= (date(2014,2,28) - relativedelta(months=12))\
                                                        , col("energy_max")).otherwise(0))\
                                                  .over(window))

#------
# Mean
#------

df = df.withColumn("energy_mean_3months", avg(when(df.day_date \
                                                             >= (date(2014,2,28) - relativedelta(months=3))\
                                                        , col("energy_mean")).otherwise(0))\
                                                  .over(window))

df = df.withColumn("energy_mean_6months", avg(when(df.day_date \
                                                             >= (date(2014,2,28) - relativedelta(months=6))\
                                                        , col("energy_mean")).otherwise(0))\
                                                  .over(window))

df = df.withColumn("energy_mean_1yr", avg(when(df.day_date \
                                                             >= (date(2014,2,28) - relativedelta(months=12))\
                                                        , col("energy_mean")).otherwise(0))\
                                                  .over(window))


#------
# Stddev
#------

df = df.withColumn("energy_stddev_3months", stddev(when(df.day_date \
                                                             >= (date(2014,2,28) - relativedelta(months=3))\
                                                        , col("energy_sum")).otherwise(0))\
                                                  .over(window))

df = df.withColumn("energy_stddev_6months", stddev(when(df.day_date \
                                                             >= (date(2014,2,28) - relativedelta(months=6))\
                                                        , col("energy_sum")).otherwise(0))\
                                                  .over(window))

df = df.withColumn("energy_stddev_1yr", stddev(when(df.day_date \
                                                             >= (date(2014,2,28) - relativedelta(months=12))\
                                                        , col("energy_sum")).otherwise(0))\
                                                  .over(window))

df.printSchema()

### Create table in online store
 The following command will create a table in Amazon Keyspaces that will hold our new features. You may see an error "AnalysisException: Couldn't find 800KEYSPACE.energy_data_features or any similarly named keyspace and table pairs". This is because Keypaces creates tables asynchronously.   

In [None]:
spark.sql("""
CREATE TABLE myCatalog.800KEYSPACE.energy_data_features (
     id String,
    day_date Date,
    energy_median Double,
    energy_mean Double,
    energy_max Double,
    energy_count Int,
    energy_std Double,
    energy_sum Double,
    energy_min Double,
    energy_sum_3months Double,
    energy_sum_6months Double,
    energy_sum_1yr Double,
    energy_count_3months Long,
    energy_count_6months Long,
    energy_count_1yr Long,
    energy_max_3months Double,
    energy_max_6months Double,
    energy_max_1yr Double,
    energy_mean_3months Double,
    energy_mean_6months Double,
    energy_mean_1yr Double,
    energy_stddev_3months Double,
    energy_stddev_6months Double,
    energy_stddev_1yr Double)
  PARTITIONED BY (id)
  TBLPROPERTIES (
    clustering_key='day_date.asc'
  )
""")

### Pad Data

The following will replace null values with 0 for data quality. In Keyspaces it could be stored be stored by leaving the field unset. 

In [None]:
df = df.selectExpr('household_id as id','day_date','energy_median','energy_mean','energy_max','energy_count','energy_std',\
               'energy_sum','energy_min','energy_sum_3months','energy_sum_6months','energy_sum_1yr',\
               'energy_count_3months','energy_count_6months','energy_count_1yr','energy_max_3months',\
               'energy_max_6months','energy_max_1yr','energy_mean_3months','energy_mean_6months','energy_mean_1yr',\
               'energy_stddev_3months','energy_stddev_6months','energy_stddev_1yr').fillna(0)

print("Records in Feature Dataset: {0:,}".format(df.count()))

#### Write the data in the DynamicFrame to the offline store Delta store.

Now that we have developed several features and prepared the data, we then store the features in the offline store.  You will first create a new s3 bucket and use spark and delta lake to persist the features. 


In [None]:
import boto3
# Create a new bucket for the data lake

df.write.format("delta")\
        .mode("overwrite")\
        .partitionBy('day_date')\
        .save("s3://800S3BUCKET/offline-feature-store")

print("Saved data to offline store")

#### Write the data in the DynamicFrame to the oneline store in Amazon Keyspaces.

Now next will move the features to the online store in Amazon Keyspaces. Glue/Spark has at-least once garentees for data persistence. If the Job complete, writes to both online and offline have been delivered at-least once. It is also possible to run a second anti-entropy job to compare like for like features using spark extensions. 

In [None]:
df.write.format("org.apache.spark.sql.cassandra").mode("append").option("keyspace", '800KEYSPACE').option("table", 'energy_data_features').save()

print("Saved data to online store")

### Simulate random reads

To test our online store read performance we run a mix of random reads based on the id look ups. After the job runs you can check latencies in CloudWatch.  

In [None]:
from pyspark.sql.functions import rand 

tableDf = spark.read.format("org.apache.spark.sql.cassandra").option("keyspace", '800KEYSPACE').option("table", 'energy_data_features').load() 

cond = [tableDf.id == df.id, tableDf.day_date == df.day_date]

totalreads = df.orderBy(rand()).join(tableDf, cond, "left").count()

print("Records in Feature Dataset: {0:,}".format(totalreads))

### Conclusion

AWS Glue, Amazon S3, and Amazon Keyspaces provide a flexible and scalable way to store, share, and manage ML model features for training and inference to promote feature reuse across ML applications. Glue, S3, and Keyspaces are serverless data services. Developers can build in isolation with full capabilities. Teams can scale production services with virtually unbounded capacity. Ingest features from any data source including streaming and batch such as application logs, service logs, clickstreams, sensors, and tabular data from AWS or third party data sources. Transform data into ML features and build feature pipelines using Glue that support MLOps practices and speed time to model deployment. 


