# Module 5: Offline Batch ingestion via SageMaker Processing job using Feature Store Spark Connector

---

**Note:** Please set kernel to `Python 3 (Data Science)` and select instance to `ml.t3.medium`

Please be aware, that you need to run through the notebook [m5_nb0_partition_data](https://github.com/aws-samples/amazon-sagemaker-feature-store-end-to-end-workshop/blob/main/05-module-scalable-batch-ingestion/m5_nb0_partition_data.ipynb) in this section of the workshop, to setup the needed data. 

## Contents

1. [Setup](#Setup)
1. [Extending a PySpark Container and pushing to ECR](#ExtendPySparkContainer)
1. [Create PySpark SageMaker Processing script](#Create-PySpark-SageMaker-Processing-script)
1. [Run batch ingestion job](#Run-batch-ingestion-job)
1. [Verify processing job results](#Verify-processing-job-results)



In this example, an alteranative route through the batch ingestion via PySpark Processing containers will be explored to ingest data direclty into the Offline Store. We will use the `.ingest_data()` api instead of the `.put_record()` api. 
This circumvents using the Online Store and saves cost when only the offline store is needed. 


To achieve this, the package [Sagemaker Feature Store Pyspark](https://pypi.org/project/sagemaker-feature-store-pyspark/) is needed. If you want to use other means of Spark, please see the [Documentation](https://docs.aws.amazon.com/sagemaker/latest/dg/batch-ingestion-spark-connector-setup.html) for further guidance.
The usage with a PySpark Processing container, requires us to extend the PySpark Processing docker image, push it to ECR to have all the needed packages ready. 

If you want to execute the docker builds from SageMaker Studio, please set it up as instructed in this [blog](https://aws.amazon.com/blogs/machine-learning/using-the-amazon-sagemaker-studio-image-build-cli-to-build-container-images-from-your-studio-notebooks/). Otherwise execute all command blocks that are marked with `docker needed` in an environment with accesss to the internet and docker. 

# Setup

#### Imports 

In [None]:
from sagemaker.spark.processing import PySparkProcessor
from sagemaker import get_execution_role
from random import randint
import sagemaker
import logging
import boto3
import json

In [None]:
logger = logging.getLogger('__name__')
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler())

In [None]:
logger.info(f'Using SageMaker version: {sagemaker.__version__}')

#### Essentials

In [None]:
try:
    role = get_execution_role()
except:
    # for local dev, please set your sagemaker role here
    role = 'arn:aws:iam::XXXXXXXX:role/service-role/role-name'
logger.info(f'Role = {role}')
sagemaker_session = sagemaker.Session()
region = sagemaker_session.boto_region_name
featurestore_runtime_client = sagemaker_session.boto_session.client('sagemaker-featurestore-runtime', region_name=region)
default_bucket = sagemaker_session.default_bucket()
logger.info(f'Default bucket = {default_bucket}')
prefix = 'sagemaker-feature-store'

# Extending a PySpark Container and pushing to ECR

To work with the feature store manager as shown in the documentation, the PySpark Processing container needs to be extended. 

Therefore, access to docker is required. In the following section, we will use the plain docker commands, without utilizing the the possibilities of using sagemaker studio docker.

In [None]:
# create a folder for our docker files
!mkdir docker

In [None]:
%%writefile docker/Dockerfile

# Docker image of the current PySpark processing container with spark 3.2, python 3.9 and optimized for CPU
FROM 173754725891.dkr.ecr.us-east-1.amazonaws.com/sagemaker-spark-processing:3.2-cpu-py39-v1.0

# Upgrade pip 
RUN pip3 install --upgrade pip

# Install the sagemaker feature store pyspark connector 
RUN pip3 install sagemaker-feature-store-pyspark-3.2 --no-binary :all: --verbose

# Send logs direct to the terminal
ENV PYTHONUNBUFFERED=TRUE

# Set the entrypoint 
ENTRYPOINT [ "smspark-submit", "processing.py" ]

In [None]:
account_id = boto3.client('sts').get_caller_identity().get('Account')
tag = ':latest'
ecr_repository = 'pyspark-feature-store-spark-batch-ingestion'
processing_repository_uri = '{}.dkr.ecr.{}.amazonaws.com/{}'.format(account_id, region, ecr_repository + tag)
region, ecr_repository, processing_repository_uri

### Building the docker file with regular docker

The following commands will run only with docker running on you current machine. 
If you run on SageMaker Studio, please use the sm-docker library. This [blog](https://aws.amazon.com/blogs/machine-learning/using-the-amazon-sagemaker-studio-image-build-cli-to-build-container-images-from-your-studio-notebooks/) descirbes how to setup sagemaker docker. 


In [None]:
# docker needed

!aws ecr get-login-password --region {region} | docker login --username AWS --password-stdin {account_id}.dkr.ecr.{region}.amazonaws.com 
!aws ecr get-login-password --region {region} | docker login --username AWS --password-stdin 173754725891.dkr.ecr.{region}.amazonaws.com 
!docker build -t $ecr_repository . --file docker/Dockerfile
!aws ecr create-repository --repository-name $ecr_repository
!docker tag {ecr_repository + tag} $processing_repository_uri
!docker push $processing_repository_uri

making sure that everything is as expected

In [None]:
ecr_client = boto3.client('ecr')
ecr_client.describe_images(repositoryName=ecr_repository)

# Create PySpark SageMaker Processing script

In [1]:
%%writefile ./scripts/batch_ingest_sm_pyspark.py
from pyspark.ml.feature import VectorAssembler, StringIndexer, MinMaxScaler
from feature_store_pyspark.FeatureStoreManager import FeatureStoreManager
from pyspark.sql.functions import udf, datediff, to_date, lit, col,isnan, when, count
from pyspark.sql.types import IntegerType, DoubleType, StructType, StructField, StringType, FloatType
from pyspark.sql import SparkSession, DataFrame
from argparse import Namespace, ArgumentParser
from pyspark.ml.linalg import Vector
from pyspark.ml import Pipeline
from datetime import datetime
import argparse
import ast
import logging
import boto3
import time
import os


logger = logging.getLogger('__name__')
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler())



def transform_row(row) -> list:
    columns = list(row.asDict())
    record = []
    for column in columns:
        feature = {'FeatureName': column, 'ValueAsString': str(row[column])}
        record.append(feature)
    return record

def ingest_to_feature_store(args: argparse.Namespace, rows) -> None:
    feature_group_name = args.feature_group_name
    session = boto3.session.Session()
    featurestore_runtime_client = session.client(service_name='sagemaker-featurestore-runtime')
    rows = list(rows)
    logger.info(f'Ingesting {len(rows)} rows into feature group: {feature_group_name}')
    for _, row in enumerate(rows):
        record = transform_row(row)
        response = featurestore_runtime_client.put_record(FeatureGroupName=feature_group_name, Record=record)
        assert response['ResponseMetadata']['HTTPStatusCode'] == 200

def batch_ingest_to_feature_store(args: argparse.Namespace, df: DataFrame) -> None:
    feature_group_name = args.feature_group_name
    logger.info(f'Feature Group name supplied is: {feature_group_name}')
    session = boto3.session.Session()

    logger.info(f'Instantiating FeatureStoreManger!')
    feature_store_manager=FeatureStoreManager()

    logger.info(f'trying to load datatypes directly from Dataframe')

    # Load the feature definitions from input schema. The feature definitions can be used to create a feature group
    feature_definitions = feature_store_manager.load_feature_definitions_from_schema(df)
    logger.info(f'Feature definitions loaded successfully!')
    print(feature_definitions)
    feature_group_arn = args.feature_group_arn
    logger.info(f'Feature Group ARN supplied is: {feature_group_arn}')

    # If only OfflineStore is selected, the connector will batch write the data to offline store directly
    args.target_feature_store_list = ast.literal_eval(args.target_feature_store_list)
    logger.info(f'Ingesting into the following stores: {args.target_feature_store_list}')

    feature_store_manager.ingest_data(input_data_frame=df, feature_group_arn=feature_group_arn, target_stores= args.target_feature_store_list) 
    logger.info(f'Feature Ingestions successful!')


def parse_args() -> None:
    parser = argparse.ArgumentParser()
    parser.add_argument('--num_processes', type=int, default=1)
    parser.add_argument('--num_workers', type=int, default=1)
    parser.add_argument('--feature_group_name', type=str)
    parser.add_argument('--feature_group_arn', type=str)
    parser.add_argument('--target_feature_store_list', type=str)
    parser.add_argument('--s3_uri_prefix', type=str)
    
    args, _ = parser.parse_known_args()
    return args

def check_data_quality(df: DataFrame) -> DataFrame:
    # Sanity checking secktion 
    logger.info(f'First 5 rows of the dataframe for inspection: {df.show(5)}')

    df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]
    ).show()

    # checking for categorical columns
    categorical_cols = [field for (field, dataType) in df.dtypes if dataType == 'string']
    logger.info(f'Categorical columns: {categorical_cols}')

    # checking for numerical columns
    numerical_cols = [field for (field, dataType) in df.dtypes if ((dataType == 'double') | (dataType == 'int') | (dataType == 'float'))]
    logger.info(f'Numerical columns: {numerical_cols}')

    # checking for boolean columns  
    boolean_cols = [field for (field, dataType) in df.dtypes if dataType == 'boolean']
    logger.info(f'Boolean columns: {boolean_cols}')

    # checking for date columns
    date_cols = [field for (field, dataType) in df.dtypes if dataType == 'date']
    logger.info(f'Date columns: {date_cols}')


def scale_col(df: DataFrame, col_name: str) -> DataFrame:
    unlist = udf(lambda x: round(float(list(x)[0]), 2), DoubleType())
    assembler = VectorAssembler(inputCols=[col_name], outputCol=f'{col_name}_vec')
    # scale an column col_name with minmax scaler and drop the original column

    scaler = MinMaxScaler(inputCol=f'{col_name}_vec', outputCol=f'{col_name}_scaled')
    pipeline = Pipeline(stages=[assembler, scaler])
    df = pipeline.fit(df).transform(df).withColumn(f'{col_name}_scaled', unlist(f'{col_name}_scaled')) \
                                       .drop(f'{col_name}_vec')
    df = df.drop(col_name)
    df = df.withColumnRenamed(f'{col_name}_scaled', col_name)
    return df

def ordinal_encode_col(df: DataFrame, col_name: str) -> DataFrame:
    indexer = StringIndexer(inputCol=col_name, outputCol=f'{col_name}_new')
    df = indexer.fit(df).transform(df)
    df = df.drop(col_name)
    df = df.withColumnRenamed(f'{col_name}_new', col_name)
    return df


def run_spark_job():

    args = parse_args()
    #add further packages as needed
    pkg_list = []
    pkg_list.append("software.amazon.sagemaker.featurestore:sagemaker-feature-store-spark-sdk_2.12:1.1.0")
    packages=(",".join(pkg_list))

    logger.info(f'Added the following packages to Spark: {packages}')

    spark = SparkSession.builder.appName("PySparkJobFeatureStore") \
        .config("spark.jars.packages", packages) \
        .getOrCreate()
    
    # set the legacy time parser policy to LEGACY to allow for parsing of dates in the format dd/MM/yyyy HH:mm:ss, which solves backwards compatibility issues to spark 2.4
    spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")

    logger.info(f'Using Spark-Version:{spark.version}')

    # get the total number of cores in the Spark cluster; if developing locally, there might be no executor
    try:
        spark_context = spark.sparkContext
        total_cores = int(spark_context._conf.get('spark.executor.instances')) * int(spark_context._conf.get('spark.executor.cores'))
        logger.info(f'Total available cores in the Spark cluster = {total_cores}')
    except:
        total_cores = 1
        logger.info('Could not retrieve number of total cores. Setting total cores to 1')
    
    logger.info(f'Reading input file from S3. S3 uri is {args.s3_uri_prefix}')

    # define the schema of the input data
    csvSchema = StructType([
        StructField("order_id", StringType(), True),
        StructField("customer_id", StringType(), False),
        StructField("product_id", StringType(), False),
        StructField("purchase_amount", FloatType(), False),
        StructField("is_reordered", IntegerType(), False),
        StructField("purchased_on", StringType(), False),
        StructField("event_time", StringType(), False)])


    # read the pyspark dataframe with a schema 
    df = spark.read.option("header", "true").schema(csvSchema).csv(args.s3_uri_prefix)  
    
    # check the data quality of the dataframe and write findings to logs for inspection 
    check_data_quality(df)

    # transform 1 - encode boolean to int
    df = ordinal_encode_col(df, 'is_reordered')
    df = df.withColumn('is_reordered', df['is_reordered'].cast(IntegerType()))

    # transform 2 - min max scale `purchase_amount`
    df = df.withColumn('purchase_amount', df['purchase_amount'].cast(DoubleType()))
    df = scale_col(df, 'purchase_amount')
    
    # transform 3 - derive `n_days_since_last_purchase` column using the `purchased_on` col
    current_date = datetime.today().strftime('%Y-%m-%d')
    df = df.withColumn('n_days_since_last_purchase', datediff(to_date(lit(current_date)), to_date('purchased_on', 'yyyy-MM-dd')))
    df = df.drop('purchased_on')
    df = scale_col(df, 'n_days_since_last_purchase')
    
    
    logger.info(f'Number of partitions = {df.rdd.getNumPartitions()}')
    # Rule of thumb heuristic - rely on the product of #executors by #executor.cores, and then multiply that by 3 or 4
    df = df.repartition(total_cores * 3)
    logger.info(f'Number of partitions after re-partitioning = {df.rdd.getNumPartitions()}')
    logger.info(f'Feature Store ingestion start: {datetime.now().strftime("%m/%d/%Y, %H:%M:%S")}')
    batch_ingest_to_feature_store(args, df)
    logger.info(f'Feature Store ingestion complete: {datetime.now().strftime("%m/%d/%Y, %H:%M:%S")}')

if __name__ == '__main__':
    logger.info('BATCH INGESTION - STARTED')
    run_spark_job()
    logger.info('BATCH INGESTION - COMPLETED')


Overwriting ./scripts/batch_ingest_sm_pyspark.py


In [None]:
!mkdir jars
# use wget to download the jar and place it in the jars folder, please check for the right version of the jar at time 
!wget -P jars https://repo1.maven.org/maven2/software/amazon/sagemaker/featurestore/sagemaker-feature-store-spark-sdk_2.12/1.1.0/sagemaker-feature-store-spark-sdk_2.12-1.1.0.jar
# rename the jar to a shorter name for convenience which is sagemaker-feature-store-spark-sdk.jar
!mv jars/sagemaker-feature-store-spark-sdk_2.12-1.1.0.jar jars/sagemaker-feature-store-spark-sdk.jar

# Run batch ingestion job

In [None]:
%store -r orders_feature_group_name
s3_uri_prefix = f's3://{default_bucket}/{prefix}/partitions/*'
# REUSE orders feature group name from module 1
feature_group_name = orders_feature_group_name 
feature_group_name 

In [None]:
sm_client=boto3.client('sagemaker')

feature_group_description = sm_client.describe_feature_group(FeatureGroupName=feature_group_name)
feature_group_arn = feature_group_description['FeatureGroupArn']

# please specify what target stores you want to ingest into -> PySpark does not accept list as a parameter
target_feature_store_list = "['OfflineStore']" # ['OfflineStore', 'OnlineStore'] for both

feature_group_name, feature_group_arn, target_feature_store_list, s3_uri_prefix

In [None]:
feature_group_description["FeatureDefinitions"]

In [None]:
from sagemaker.spark.processing import PySparkProcessor

pyspark_processor = PySparkProcessor(
    base_job_name="spark-preprocessor",
    image_uri = processing_repository_uri,
    role=role,
    instance_count=1,
    instance_type="ml.m5.xlarge",
    max_runtime_in_seconds=1200,
)

pyspark_processor.run(submit_app='./scripts/batch_ingest_sm_pyspark.py', 
                      arguments = ['--feature_group_name', feature_group_name, 
                                   '--s3_uri_prefix', s3_uri_prefix,
                                   '--feature_group_arn', feature_group_arn,
                                   '--target_feature_store_list', target_feature_store_list],
                      submit_jars=["jars/sagemaker-feature-store-spark-sdk.jar"],
                      spark_event_logs_s3_uri=f's3://{default_bucket}/spark-logs', 
                      logs=False,
                      wait=True)  # set logs=True to disable logging

# Verify processing job results

In [None]:
order_id =  f'O{randint(1, 100000)}'
logger.info(f'order_id={order_id}') 
print(feature_group_name)
feature_record = featurestore_runtime_client.get_record(FeatureGroupName=feature_group_name, 
                                                        RecordIdentifierValueAsString=order_id)
print(json.dumps(feature_record, indent=2))

In [None]:
import pyathena as pa
import pandas as pd

# getting the latest fetaure group description
feature_group_description = sm_client.describe_feature_group(FeatureGroupName=feature_group_name)

# Opening a connection to Athena
conn = pa.connect(s3_staging_dir=f's3://{default_bucket}/athena-staging',
                    region_name=region)

# Getting the table name from the feature group description
table_name = feature_group_description['OfflineStoreConfig']['DataCatalogConfig']['TableName']

# Querying the table
query = f"""SELECT * FROM \"sagemaker_featurestore\".\"{table_name}\" 
        ORDER BY "write_time" DESC
        LIMIT 1000;"""

df = pd.read_sql(query, conn)
df