# From Delta Lake to Amazon SageMaker

[Delta Lake](https://delta.io/) is a common open-source framework used for storing data in Lakehouse architectures.

In this sample we demonstrate how to integrate Delta Tables with Amazon SageMaker for performing data exploration, ingestion, processing, training, and hosting for Machine Learning.

---

## 2 - Feature Engineering and Ingestion

***Use Kernel "Data Science 3.0 (Python 3)" for running this notebook***

In this notebook, we will ingest data from our Delta Tables, perform some transformations on it via code using **SageMaker Processing**, and ingesting the resulting features into **SageMaker Feature Store**. For this purpose we will:
* Create a SageMaker Feature Store Feature Group, both offline and online
* Prepare a processing script for our feature engineering, including the configuration for connecting to our Delta Table
* Run a SageMaker Processing job pointing towards our sample Delta Table profile file URL. It will include the code for the transformations and ingesting the resulting features into our Feature Group

<center><img src="../images/DeltaLake_to_SageMaker_2.png" width="50%"></center>

Note the transformations to the data can also be performed with other services in AWS, e.g. for low-code/no-code processing you can rely on **SageMaker Data Wrangler**, as it currently supports direct connections towards Delta Lakes via JDBC for data exploration, analysis, and feature engineering. You can check more details about this method in this blog post:

https://aws.amazon.com/blogs/machine-learning/prepare-data-from-databricks-for-machine-learning-using-amazon-sagemaker-data-wrangler/

### Processing data from Delta Lake with SageMaker Processing

In [15]:
import boto3
import sagemaker
from sagemaker import get_execution_role
import pandas as pd
import os
from urllib.parse import urlparse

# S3 bucket for saving processing job outputs
sm_session = sagemaker.Session()
bucket = sm_session.default_bucket()
region = sm_session.boto_region_name

# Delta Sharing profile file location - Replace these with your own if you want to customize this example
table = f's3a://{bucket}/delta_to_sagemaker/delta_format/'
output_path = f's3://{bucket}/delta_to_sagemaker/processing_output/'

role = get_execution_role()

We will start by creating a SageMaker Feature Store Feature Group.

In [16]:
import time
current_time_sec = int(round(time.time()))

features = pd.read_csv('../data/fact_rating_synthetic.csv')
features.drop('timestamp', axis=1, inplace=True)
features.columns=['rowID', 'ratingID', 'userID', 'placeID', 'rating_overall', 'rating_food', 'rating_service']
features = features.astype({'rowID': 'string'})
features = features.astype({'userID': 'string'})
features["EventTime"] = pd.Series([current_time_sec] * len(features), dtype="float64")

print(features.dtypes)

rowID              string
ratingID            int64
userID             string
placeID             int64
rating_overall      int64
rating_food         int64
rating_service      int64
EventTime         float64
dtype: object


In [17]:
from sagemaker.feature_store.feature_group import FeatureGroup
feature_group_name = 'rating-fg'

feature_group = FeatureGroup(
    name=feature_group_name,
    sagemaker_session=sm_session,
)

feature_group.load_feature_definitions(features)

[FeatureDefinition(feature_name='rowID', feature_type=<FeatureTypeEnum.STRING: 'String'>),
 FeatureDefinition(feature_name='ratingID', feature_type=<FeatureTypeEnum.INTEGRAL: 'Integral'>),
 FeatureDefinition(feature_name='userID', feature_type=<FeatureTypeEnum.STRING: 'String'>),
 FeatureDefinition(feature_name='placeID', feature_type=<FeatureTypeEnum.INTEGRAL: 'Integral'>),
 FeatureDefinition(feature_name='rating_overall', feature_type=<FeatureTypeEnum.INTEGRAL: 'Integral'>),
 FeatureDefinition(feature_name='rating_food', feature_type=<FeatureTypeEnum.INTEGRAL: 'Integral'>),
 FeatureDefinition(feature_name='rating_service', feature_type=<FeatureTypeEnum.INTEGRAL: 'Integral'>),
 FeatureDefinition(feature_name='EventTime', feature_type=<FeatureTypeEnum.FRACTIONAL: 'Fractional'>)]

In [18]:
feature_group.create(
    s3_uri=f's3://{bucket}/delta_to_sagemaker/',
    role_arn=role,
    record_identifier_name='rowID',
    event_time_feature_name='EventTime',
    enable_online_store=True,
)

{'FeatureGroupArn': 'arn:aws:sagemaker:eu-west-1:889960878219:feature-group/rating-fg',
 'ResponseMetadata': {'RequestId': '7384525e-831f-46dd-8888-e83598da9538',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '7384525e-831f-46dd-8888-e83598da9538',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '86',
   'date': 'Mon, 30 Jan 2023 11:02:58 GMT'},
  'RetryAttempts': 0}}

To confirm that your FeatureGroup has been created we use `DescribeFeatureGroup` and `ListFeatureGroups` APIs to display the created FeatureGroup.

In [19]:
feature_group.describe()

{'FeatureGroupArn': 'arn:aws:sagemaker:eu-west-1:889960878219:feature-group/rating-fg',
 'FeatureGroupName': 'rating-fg',
 'RecordIdentifierFeatureName': 'rowID',
 'EventTimeFeatureName': 'EventTime',
 'FeatureDefinitions': [{'FeatureName': 'rowID', 'FeatureType': 'String'},
  {'FeatureName': 'ratingID', 'FeatureType': 'Integral'},
  {'FeatureName': 'userID', 'FeatureType': 'String'},
  {'FeatureName': 'placeID', 'FeatureType': 'Integral'},
  {'FeatureName': 'rating_overall', 'FeatureType': 'Integral'},
  {'FeatureName': 'rating_food', 'FeatureType': 'Integral'},
  {'FeatureName': 'rating_service', 'FeatureType': 'Integral'},
  {'FeatureName': 'EventTime', 'FeatureType': 'Fractional'}],
 'CreationTime': datetime.datetime(2023, 1, 30, 11, 2, 58, 516000, tzinfo=tzlocal()),
 'OnlineStoreConfig': {'EnableOnlineStore': True},
 'OfflineStoreConfig': {'S3StorageConfig': {'S3Uri': 's3://sagemaker-eu-west-1-889960878219/delta_to_sagemaker/',
   'ResolvedOutputS3Uri': 's3://sagemaker-eu-west-1-8

In [20]:
sm_session.boto_session.client(
    "sagemaker", region_name=region
).list_feature_groups(NameContains = 'rating-fg')

{'FeatureGroupSummaries': [{'FeatureGroupName': 'rating-fg',
   'FeatureGroupArn': 'arn:aws:sagemaker:eu-west-1:889960878219:feature-group/rating-fg',
   'CreationTime': datetime.datetime(2023, 1, 30, 11, 2, 58, 516000, tzinfo=tzlocal()),
   'FeatureGroupStatus': 'Creating'}],
 'NextToken': 'cIws2QhTXUIa8bi8VaXSkZyCip699fykY7CU0a6rxKatwtimPuAdGzYreBOG3pbaFVr5tTurbv5xARRYoVoagJZnPP02cK+IA+BY9AeyWiiF1KjDfjmXQeS30w08eRyPK1t5lfLAADVUz+FY7DuTh3XkdpxsdKqg3/fRdDZX0bipoRPK8tf5CuCN2co9lz9UViIUrqDpsigbyzkorDbMMmuw61OoLN2X9MeBQx38VttxsnQymiHvS/gugYXRaPRGP281mvSj2KAN6ZM14ET0VjKh1BB+Myvn4Sddqs7Up6TGGJmWKkPCh+hejOIv98DivOsiSNTF0BA5MR92qKKd/ZOPreRYgaHbZR0dorzN8v+2q02sKwswwYQFB4BrN6rr/7diXwkQ4l5nR3ocyPWVk9ZDZDX19VyJaXIN4LoMBCpkEY4Kf6+6j0vg2Jwi7FjOnZpzGBpGJrJ1rQWITGWn6Pw+puVl47TJniiGtBEiiUAJx5TK3t1red/48xU9y7G7Yt87uwdGDhiVK13FFT+nQy/epnqHMp4cj6X9jIMHhmGWu+B5E5vQBSAg2oR8i4wutNa1Cef80kcoAsLt2sOXWvi9IYHjkR8mWUE=',
 'ResponseMetadata': {'RequestId': '00d5d6ad-5009-4c8f-b7b6-71f9cd2d035c',
  'HTTPStatusCode

We can now define our SageMaker Processing job for performing the transformations on our data, and ingesting the results into our Feature Group.

In [21]:
from sagemaker.spark.processing import PySparkProcessor

spark_processor = PySparkProcessor(
    base_job_name="delta-to-sagemaker-",
    framework_version="3.1",
    role=role,
    instance_count=1, #set to >1 for distributed processing
    instance_type="ml.m5.xlarge",
    max_runtime_in_seconds=1200,
)

We will now create our pre-processing script, including code for reading our Delta Table, performing transformations, and ingesting the resulting features into SageMaker Feature Store.

Note in our example we are just including some simple transformations recommended by Data Wrangler on our dataset, but you can replace those with your own transformations if you want to customize this example.

In [32]:
%%writefile ./code/preprocessing.py
import argparse
import csv
import os
import shutil
import sys
import time
import boto3
import pandas as pd
from decimal import Decimal

# Import pyspark and build Spark session
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, SQLContext

# Defining some functions for efficiently ingesting into SageMaker Feature Store...
def remove_exponent(d):
    return d.quantize(Decimal(1)) if d == d.to_integral() else d.normalize()

def transform_row(columns, row) -> list:
    record = []
    for column in columns:
        if column != 'timestamp':
            try:
                feature = {'FeatureName': column, 'ValueAsString': str(remove_exponent(Decimal(str(row[column]))))}
            except:
                feature = {'FeatureName': column, 'ValueAsString': str(row[column])}
            # We can't ingest null value for a feature type into a feature group
            if str(row[column]) not in ['NaN', 'NA', 'None', 'nan', 'none']:
                record.append(feature)
    # Complete with EventTime feature
    timestamp = {'FeatureName': 'EventTime', 'ValueAsString': str(pd.to_datetime("now").timestamp())}
    record.append(timestamp)
    return record

def ingest_to_feature_store(fg, region, rows) -> None:
    session = boto3.session.Session(region_name=region)
    featurestore_runtime_client = session.client(service_name='sagemaker-featurestore-runtime')
    columns = rows.columns
    for index, row in rows.iterrows():
        record = transform_row(columns, row)
        #print(f'Putting record:{record}')
        response = featurestore_runtime_client.put_record(FeatureGroupName=fg, Record=record)
        #print(f'Done with row:{index}')
        assert response['ResponseMetadata']['HTTPStatusCode'] == 200

def main():
    parser = argparse.ArgumentParser(description="app inputs and outputs")
    parser.add_argument("--region", type=str, help="AWS region")
    parser.add_argument("--table", type=str, help="Delta Table URL")
    parser.add_argument("--feature-group", type=str, help="Name of the Feature Group")
    parser.add_argument("--output-path", type=str, help="S3 prefix for storing resulting dataset")
    args = parser.parse_args()

    # Instantiate Spark via builder
    # Note: we use the `ContainerCredentialsProvider` to give us access to underlying IAM role permissions
    spark = (SparkSession
        .builder
        .appName("PySparkApp") 
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") 
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") 
        .config("fs.s3a.aws.credentials.provider",'com.amazonaws.auth.ContainerCredentialsProvider') 
        .getOrCreate())

    sc = spark.sparkContext
    print('Spark version: '+str(sc.version))
    
    s3a_delta_table_uri=args.table
    print(s3a_delta_table_uri)

    # Create SQL command inserting the S3 path location
    sql_cmd = f'SELECT * FROM delta.`{s3a_delta_table_uri}` ORDER BY timestamp'
    print(f'SQL command: {sql_cmd}')

    # Execute SQL command which returns dataframe
    sql_results = spark.sql(sql_cmd)
    print(type(sql_results))

    # ----------------
    # Transformations - Pandas code generated by sagemaker_datawrangler
    processed_features = sql_results.toPandas().copy(deep=True)

    # Code to Replace with new value for column: userID to resolve warning: Disguised missing values 
    generic_value = 'Other'
    processed_features['userID']=processed_features['userID'].replace('na', 'Other', regex=False)
    processed_features['userID']=processed_features['userID'].replace('nA', 'Other', regex=False)

    # Code to Drop column for column: ratingID to resolve warning: ID column 
    processed_features=processed_features.drop(columns=['ratingID'])

    # Capture resulting data frame in Spark
    sqlContext = SQLContext(sc)
    df=sqlContext.createDataFrame(processed_features)
    # ----------------
    
    # Write processed data after transformations...
    processed_features_output_path = args.output_path + 'processed_features.csv'
    df.write.csv(processed_features_output_path)
    print("Saving processed features to {}".format(processed_features_output_path))
    
    # Ingesting the resulting data into our Feature Group...
    print(f"Ingesting processed features into Feature Group {args.feature_group}...")
    ingest_to_feature_store(args.feature_group, args.region, processed_features)
    print("All done.")


if __name__ == "__main__":
    main()

Overwriting ./code/preprocessing.py


In our example we are using the SageMaker Spark container as a base, so we will just include the additional "delta-core" library as an additional JAR file, together with "boto3" required for ingesting into the Feature Group.

In [33]:
from sagemaker.processing import ProcessingInput, ProcessingOutput

spark_processor.run(
    submit_app="./code/preprocessing.py",
    submit_jars=["delta-core_2.12-1.0.1.jar"],
    arguments=[
        '--region', region,
        '--table', table,
        '--feature-group', feature_group_name,
        '--output-path', output_path, 
    ],
)

INFO:sagemaker.spark.processing:Copying dependency from local path delta-core_2.12-1.0.1.jar to tmpdir /tmp/tmpadkl4tnp
INFO:sagemaker.spark.processing:Uploading dependencies from tmpdir /tmp/tmpadkl4tnp to S3 s3://sagemaker-eu-west-1-889960878219/delta-to-sagemaker--2023-01-30-13-46-08-745/input/jars
INFO:sagemaker:Creating processing-job with name delta-to-sagemaker--2023-01-30-13-46-08-745



Job Name:  delta-to-sagemaker--2023-01-30-13-46-08-745
Inputs:  [{'InputName': 'jars', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-eu-west-1-889960878219/delta-to-sagemaker--2023-01-30-13-46-08-745/input/jars', 'LocalPath': '/opt/ml/processing/input/jars', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}, {'InputName': 'code', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-eu-west-1-889960878219/delta-to-sagemaker--2023-01-30-13-46-08-745/input/code/preprocessing.py', 'LocalPath': '/opt/ml/processing/input/code', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}]
Outputs:  []
.........................[34m01-30 13:50 smspark.cli  INFO     Parsing arguments. argv: ['/usr/local/bin/smspark-submit', '--jars', '/opt/ml/processing/input/jars', '/opt/ml/processing/input/code/preprocessing.py', '--region', 'eu-west-1', '

Let's get one of the processed files generated.

In [65]:
boto_session = boto3.Session()
s3 = boto_session.resource('s3')
my_bucket = s3.Bucket(bucket)

for objects in my_bucket.objects.filter(Prefix="delta_to_sagemaker/processing_output/processed_features.csv/"):
    if objects.key.endswith('csv'):
        print(objects.key)
        processed_features = pd.read_csv(f's3://{bucket}/{objects.key}', index_col=None, header=None)
        break
processed_features.head()


delta_to_sagemaker/processing_output/processed_features.csv/part-00000-8403d78e-1f33-4dfb-994d-47f211d9c8b4-c000.csv


Unnamed: 0,0,1,2,3,4,5,6
0,0,2022-08-25,gK,681,1,2,2
1,1,2022-08-25,gK,719,1,1,1
2,2,2022-08-25,gK,1128,1,2,2
3,3,2022-08-25,gK,1203,1,2,2
4,4,2022-08-25,gK,1058,1,1,1


----

### Verifying processed data in SageMaker Feature Store

Using an arbirary customer record id, 57 we use `get_record` to check that the data has been ingested into the feature group.

In [58]:
customer_id = 5
sample_record = sm_session.boto_session.client(
    "sagemaker-featurestore-runtime", region_name=region
).get_record(
    FeatureGroupName=feature_group_name, RecordIdentifierValueAsString=str(customer_id)
)

In [59]:
sample_record

{'ResponseMetadata': {'RequestId': 'a405963b-7c08-42dc-b9e8-eed86ab6ffe1',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'a405963b-7c08-42dc-b9e8-eed86ab6ffe1',
   'content-type': 'application/json',
   'content-length': '370',
   'date': 'Mon, 30 Jan 2023 14:16:44 GMT'},
  'RetryAttempts': 0},
 'Record': [{'FeatureName': 'rowID', 'ValueAsString': '5'},
  {'FeatureName': 'userID', 'ValueAsString': 'gK'},
  {'FeatureName': 'placeID', 'ValueAsString': '585'},
  {'FeatureName': 'rating_overall', 'ValueAsString': '1'},
  {'FeatureName': 'rating_food', 'ValueAsString': '0'},
  {'FeatureName': 'rating_service', 'ValueAsString': '0'},
  {'FeatureName': 'EventTime', 'ValueAsString': '1675086670.685034'}]}

We can also use `batch_get_record` to check multiple records ingested into the feature groups by providing customer ids.

In [60]:
all_records = sm_session.boto_session.client(
    "sagemaker-featurestore-runtime", region_name=region
).batch_get_record(
    Identifiers=[
        {
            "FeatureGroupName": feature_group_name,
            "RecordIdentifiersValueAsString": ["57", "10", "8", "124"],
        }
    ]
)

In [61]:
all_records

{'ResponseMetadata': {'RequestId': '41240b8b-dd53-4245-9a78-602b22d3dcf5',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '41240b8b-dd53-4245-9a78-602b22d3dcf5',
   'content-type': 'application/json',
   'content-length': '1814',
   'date': 'Mon, 30 Jan 2023 14:16:47 GMT'},
  'RetryAttempts': 0},
 'Records': [{'FeatureGroupName': 'rating-fg',
   'RecordIdentifierValueAsString': '57',
   'Record': [{'FeatureName': 'rowID', 'ValueAsString': '57'},
    {'FeatureName': 'userID', 'ValueAsString': 'gR'},
    {'FeatureName': 'placeID', 'ValueAsString': '984'},
    {'FeatureName': 'rating_overall', 'ValueAsString': '1'},
    {'FeatureName': 'rating_food', 'ValueAsString': '1'},
    {'FeatureName': 'rating_service', 'ValueAsString': '1'},
    {'FeatureName': 'EventTime', 'ValueAsString': '1675086671.305091'}]},
  {'FeatureGroupName': 'rating-fg',
   'RecordIdentifierValueAsString': '10',
   'Record': [{'FeatureName': 'rowID', 'ValueAsString': '10'},
    {'FeatureName': 'userID',

----------

### (Optional) Clean-up

In [12]:
#Delete the feature group
!aws sagemaker delete-feature-group --feature-group-name rating-fg