## Ingest Orchestration via SageMaker Processing using PySpark

In [None]:
import boto3
import sagemaker
from sagemaker import get_execution_role
from sagemaker.spark.processing import PySparkProcessor

#### Application Code using Pyspark to ingest into Feature Store. This is run in the SageMaker Processing Container

In [None]:
%%writefile fs_pyspark_ingest.py
import pyspark
from pyspark.sql import SparkSession, DataFrame
import os
import sys
import argparse
import time
import boto3
import numpy as np
from botocore.config import Config


def ingest_df_into_fg(feature_group_name, rows):
    
    boto_session = boto3.Session()
    sagemaker_client = boto_session.client(service_name='sagemaker')
    featurestore_runtime = boto_session.client(service_name='sagemaker-featurestore-runtime', config=Config(
                                         retries = {
                                             'max_attempts': 10,
                                             'mode': 'standard'
                                         }))
    rows = list(rows)
    total_rows = len(rows)    
    if total_rows == 0:
        raise (f'Empty data frame')    

    for index, row in enumerate(rows):
        record = [{"FeatureName": column, "ValueAsString": str(row[column])} \
                   for column in row.__fields__ if row[column] != None]

        resp = featurestore_runtime.put_record(FeatureGroupName=feature_group_name, Record=record)

        if not resp['ResponseMetadata']['HTTPStatusCode'] == 200:
            raise (f'PutRecord failed: {resp}')

def run_pyspark_job(args):
    spark = SparkSession.builder.appName('PySparkJob').getOrCreate()
    df = spark.read.options(Header=True).csv(args.s3_uri_prefix)
    
    if args.custom_partitions:
        df = df.repartition(args.custom_partitions)
    
    df.foreachPartition(lambda rows: ingest_df_into_fg(args.feature_group_name, rows))
    

def parse_args():
    parser = argparse.ArgumentParser()
    parser.add_argument("--custom_partitions", type=int)
    parser.add_argument("--s3_uri_prefix", type=str, required=True)
    parser.add_argument("--feature_group_name", type=str, required=True)

    args, _ = parser.parse_known_args()
    return args

if __name__ == '__main__':
    args = parse_args()
    run_pyspark_job(args)

#### Please gather the s3 location for chunked files and the Feature Group Name

In [None]:
s3_uri_prefix = 's3://fs-pyspark-dbg/data/10M/'
feature_group_name = 'ingest-fg-06-17-2021-14-46-44'
run_config = '10M'

## Sample Configs for SageMaker Processing
Naming Convention = #RecordsInDataSet

In [None]:
instance_configs = {
    '10M':{
        'instance_type': 'ml.m5.4xlarge',
        'instance_count': 8,
        'custom_partitions': '512'
    },
    '1M':{
        'instance_type': 'ml.m5.4xlarge',
        'instance_count': 8,
        'custom_partitions': '128',
    }
}

### Orchestrate ingestion using SageMaker Processing Job

Logs for sagemaker processing jobs (please update your region in the url) - https://us-east-1.console.aws.amazon.com/sagemaker/home?region=us-east-1#/processing-jobs

In [None]:
from sagemaker.processing import ProcessingInput, ProcessingOutput

pyspark_processor = PySparkProcessor(framework_version='2.4',
                                     role=get_execution_role(),
                                     instance_type=instance_configs[run_config]['instance_type'],
                                     instance_count=instance_configs[run_config]['instance_count'],
                                     env={'AWS_DEFAULT_REGION': boto3.Session().region_name})

pyspark_processor.run(
    submit_app='fs_pyspark_ingest.py',
    arguments = ['--s3_uri_prefix', s3_uri_prefix,
                 '--feature_group_name', feature_group_name,
                 '--custom_partitions', instance_configs[run_config]['custom_partitions']],
    spark_event_logs_s3_uri='s3://fs-ingest-2/spark-logs',
    logs=False
)

Launch PySpark History Server - https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_processing.html#spark-history-server

In [None]:
pyspark_processor.start_history_server()