In [18]:
import os
import io
import pandas as pd
import numpy as np
from helper import *
import time
from time import gmtime, strftime
import boto3
from scipy.sparse import hstack
import sagemaker
from sklearn.preprocessing import OneHotEncoder
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.model_selection import train_test_split
import sagemaker.amazon.common as smac
from sagemaker.serializers import CSVSerializer
from sagemaker.inputs import TrainingInput
from sagemaker.deserializers import JSONDeserializer
from sagemaker.lambda_helper import Lambda

In [19]:
role = sagemaker.get_execution_role()
sagemaker_session = sagemaker.Session()
default_bucket = sagemaker_session.default_bucket()
region = sagemaker_session.boto_region_name
s3_client = boto3.client('s3', region_name=region)

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml


In [21]:
prefix = 'clickstreamrec-feature-store'

# Feature Store variables
fs_prefix = 'clickstreamrec-'
current_timestamp = strftime('%m-%d-%H-%M', gmtime())
customers_feature_group_name = f'{fs_prefix}customers-fg-{current_timestamp}'
products_feature_group_name = f'{fs_prefix}products-fg-{current_timestamp}'
orders_feature_group_name = f'{fs_prefix}orders-fg-{current_timestamp}'
click_stream_historical_feature_group_name = f'{fs_prefix}click-stream-historical-fg-{current_timestamp}'
click_stream_feature_group_name = f'{fs_prefix}click-stream-fg-{current_timestamp}'

In [22]:
df_customers = pd.read_csv('customers.csv')
df_products = pd.read_csv('products.csv')
df_orders = pd.read_csv('orders.csv')
df_click_stream_historical = pd.read_csv('click_stream_historical.csv')
df_click_stream = pd.read_csv('click_stream.csv')

In [23]:
print('Feature groups names:')
print(customers_feature_group_name)
print(products_feature_group_name)
print(orders_feature_group_name)
print(click_stream_historical_feature_group_name)
print(click_stream_feature_group_name)

Feature groups names:

clickstreamrec-customers-fg-12-07-00-51
clickstreamrec-products-fg-12-07-00-51
clickstreamrec-orders-fg-12-07-00-51
clickstreamrec-click-stream-historical-fg-12-07-00-51
clickstreamrec-click-stream-fg-12-07-00-51


In [24]:
customers_feature_group = create_feature_group(df_customers, customers_feature_group_name,
                                               'customer_id', prefix, sagemaker_session)
products_feature_group = create_feature_group(df_products, products_feature_group_name, 'product_id',
                                              prefix, sagemaker_session)
orders_feature_group = create_feature_group(df_orders, orders_feature_group_name, 'order_id', prefix,
                                            sagemaker_session)
click_stream_historical_feature_group = create_feature_group(df_click_stream_historical,
                                                             click_stream_historical_feature_group_name,
                                                             'click_stream_id', prefix, sagemaker_session)
click_stream_feature_group = create_feature_group(df_click_stream, click_stream_feature_group_name, 'customer_id',
                                                  prefix, sagemaker_session)

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml
Initial status: Creating
Waiting for feature group: clickstreamrec-customers-fg-12-07-00-51 to be created ...
Waiting for feature group: clickstreamrec-customers-fg-12-07-00-51 to be created ...
Waiting for feature group: clickstreamrec-customers-fg-12-07-00-51 to be created ...
Waiting for feature group: clickstreamrec-customers-fg-12-07-00-51 to be created ...
FeatureGroup clickstreamrec-customers-fg-12-07-00-51 was successfully created.
sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml
Initial status: Creating
Waiting for feature group: clickstreamrec-products-fg-12-07-00-51 to be created ...
Waiting for feature group: clickstreamrec-products-fg

In [25]:
customers_table = customers_feature_group.athena_query().table_name
products_table = products_feature_group.athena_query().table_name
orders_table = orders_feature_group.athena_query().table_name
click_stream_historical_table = click_stream_historical_feature_group.athena_query().table_name
click_stream_table = click_stream_feature_group.athena_query().table_name

In [31]:
def ingest_into_feature_group(df, feature_group):

    print(f"Ingesting data into feature group: {feature_group.name}")
    try:
        ingestion_manager = feature_group.ingest(
            data_frame=df, max_workers=5, wait=True
        )
    except Exception as e:
        print(e)
        failed_rows = ingestion_manager.failed_rows()
        num_failed_rows = len(failed_rows)
        print(f"Num failed rows: {num_failed_rows}")
        print(f"Failed rows: {failed_rows}")
    print(f"{len(df)} records ingested into feature group: {feature_group.name}")

In [30]:
ingest_into_feature_group(df_customers, customers_feature_group)

Ingesting data into feature group: clickstreamrec-customers-fg-12-07-00-51...
Waiting for ingestion to complete {'FeatureGroupArn': 'arn:aws:sagemaker:us-east-1:205825199765:feature-group/clickstreamrec-customers-fg-12-07-00-51', 'FeatureGroupName': 'clickstreamrec-customers-fg-12-07-00-51', 'RecordIdentifierFeatureName': 'customer_id', 'EventTimeFeatureName': 'event_time', 'FeatureDefinitions': [{'FeatureName': 'customer_id', 'FeatureType': 'String'}, {'FeatureName': 'name', 'FeatureType': 'String'}, {'FeatureName': 'state', 'FeatureType': 'String'}, {'FeatureName': 'age', 'FeatureType': 'Integral'}, {'FeatureName': 'is_married', 'FeatureType': 'Integral'}, {'FeatureName': 'customer_health_index', 'FeatureType': 'Fractional'}, {'FeatureName': 'event_time', 'FeatureType': 'Fractional'}], 'CreationTime': datetime.datetime(2023, 12, 7, 0, 51, 50, 868000, tzinfo=tzlocal()), 'OnlineStoreConfig': {'EnableOnlineStore': True}, 'OfflineStoreConfig': {'S3StorageConfig': {'S3Uri': 's3://sagemake

In [32]:
ingest_into_feature_group(df_products, products_feature_group)
ingest_into_feature_group(df_orders, orders_feature_group)
ingest_into_feature_group(df_click_stream_historical, click_stream_historical_feature_group)

Ingesting data into feature group: clickstreamrec-products-fg-12-07-00-51...
17001 records ingested into feature group: clickstreamrec-products-fg-12-07-00-51
Ingesting data into feature group: clickstreamrec-orders-fg-12-07-00-51...
99975 records ingested into feature group: clickstreamrec-orders-fg-12-07-00-51
Ingesting data into feature group: clickstreamrec-click-stream-historical-fg-12-07-00-51...
199950 records ingested into feature group: clickstreamrec-click-stream-historical-fg-12-07-00-51


In [36]:
featurestore_runtime = boto3.client(service_name='sagemaker-featurestore-runtime',
                                    region_name=region)

In [37]:
prefix = 'clickstreamrec'
train_key = 'train.protobuf'
train_prefix = f'{prefix}/train'
test_key = 'test.protobuf'
test_prefix = f'{prefix}/test'
output_prefix = f's3://{default_bucket}/{prefix}/output'

current_timestamp = strftime('%m-%d-%H-%M', gmtime())
query_results= 'sagemaker-clickstreamrec-featurestore'
prefix = 'clickstreamrec-feature-store'
cf_model_endpoint_name = f'clickstreamrec-collabfilter-model-{current_timestamp}'
ranking_model_endpoint_name = f'clickstreamrec-ranking-model-{current_timestamp}'

In [40]:
query = f'''
select click_stream_customers.customer_id,
       products.product_id,
       rating,
       state,
       age,
       is_married,
       product_name
from (
    select c.customer_id,
           cs.product_id,
           cs.bought,
           cs.rating,
           c.state,
           c.age,
           c.is_married
    from "{click_stream_historical_table}" as cs
    left join "{customers_table}" as c
    on cs.customer_id = c.customer_id
) click_stream_customers
left join
(select * from "{products_table}") products
on click_stream_customers.product_id = products.product_id
where click_stream_customers.bought = 1
'''

df_cf_features, query = query_offline_store(click_stream_feature_group_name, query,
                                            sagemaker_session)
df_cf_features.head()

Unnamed: 0,customer_id,product_id,rating,state,age,is_married,product_name
0,C7067,P3590,2.311836,new hampshire,34,0,low fat madagascar vanilla kefir probiotic cul...
1,C7067,P3590,2.311836,new hampshire,34,0,low fat madagascar vanilla kefir probiotic cul...
2,C1290,P10763,1.198205,missouri,43,0,turkey lasagna
3,C1290,P10763,1.198205,missouri,43,0,turkey lasagna
4,C7166,P14034,2.151193,kansas,77,0,organic ginger root powder


In [41]:
def transform_cf_data(training_df, inference_df=None):
    enc = OneHotEncoder(handle_unknown='ignore')
    vectorizer = TfidfVectorizer(min_df=2)
    
    onehot_cols = ['product_id', 'customer_id', 'is_married',
                   'state']
    
    if inference_df is not None:
        enc.fit(training_df[onehot_cols])
        onehot_output = enc.transform(inference_df[onehot_cols])
        unique_descriptions = training_df['product_name'].unique()
        vectorizer.fit(unique_descriptions)
        tfidf_output = vectorizer.transform(inference_df['product_name'])
    else:
        onehot_output = enc.fit_transform(training_df[onehot_cols])
        unique_descriptions = training_df['product_name'].unique()
        vectorizer.fit(unique_descriptions)
        tfidf_output = vectorizer.transform(training_df['product_name'])
    
    X = hstack([onehot_output, tfidf_output], format='csr', dtype='float32')
    return X
    
def load_dataset(df):
    X = transform_cf_data(df)
    y = df['rating'].values.astype('float32')
    return X, y
X, y = load_dataset(df_cf_features)

In [42]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

In [1]:
def write_data_to_protobuf(X, y, bucket, prefix, key):
    buf = io.BytesIO()
    smac.write_spmatrix_to_sparse_tensor(buf, X, y)
    buf.seek(0)
    obj = "{}/{}".format(prefix, key)
    boto3.resource("s3").Bucket(bucket).Object(obj).upload_fileobj(buf)
    return "s3://{}/{}".format(bucket, obj)

train_data_location = write_data_to_protobuf(X_train, y_train, default_bucket, train_prefix, train_key)
test_data_location = write_data_to_protobuf(X_test, y_test, default_bucket, test_prefix, test_key)

In [45]:
container = sagemaker.image_uris.retrieve("factorization-machines", region=region)

fm = sagemaker.estimator.Estimator(
    container,
    role,
    instance_count=1,
    instance_type="ml.c5.xlarge",
    output_path=output_prefix,
    sagemaker_session=sagemaker_session,
)

input_dims = X_train.shape[1]
fm.set_hyperparameters(
    feature_dim=input_dims,
    predictor_type="regressor",
    mini_batch_size=1000,
    num_factors=64,
    epochs=20,
)

In [46]:
fm.fit({'train': train_data_location, 'test': test_data_location})

INFO:sagemaker:Creating training-job with name: factorization-machines-2023-12-07-01-22-07-573


2023-12-07 01:22:07 Starting - Starting the training job...
2023-12-07 01:22:22 Starting - Preparing the instances for training.........
2023-12-07 01:23:40 Downloading - Downloading input data...
2023-12-07 01:24:10 Training - Downloading the training image....................[34mDocker entrypoint called with argument(s): train[0m
[34mRunning default environment configuration script[0m
  if num_device is 1 and 'dist' not in kvstore:[0m
[34m[12/07/2023 01:27:47 INFO 139744468858688] Reading default configuration from /opt/amazon/lib/python3.8/site-packages/algorithm/resources/default-conf.json: {'epochs': 1, 'mini_batch_size': '1000', 'use_bias': 'true', 'use_linear': 'true', 'bias_lr': '0.1', 'linear_lr': '0.001', 'factors_lr': '0.0001', 'bias_wd': '0.01', 'linear_wd': '0.001', 'factors_wd': '0.00001', 'bias_init_method': 'normal', 'bias_init_sigma': '0.01', 'linear_init_method': 'normal', 'linear_init_sigma': '0.01', 'factors_init_method': 'normal', 'factors_init_sigma': '0.001

In [48]:
cf_model_predictor = fm.deploy(
    endpoint_name = cf_model_endpoint_name,
    initial_instance_count=1,
    instance_type="ml.m4.xlarge",
    serializer=FMSerializer(),
    deserializer=JSONDeserializer(),
    wait=False
)

INFO:sagemaker:Creating model with name: factorization-machines-2023-12-07-01-29-25-129
INFO:sagemaker:Creating endpoint-config with name clickstreamrec-collabfilter-model-12-07-01-18
INFO:sagemaker:Creating endpoint with name clickstreamrec-collabfilter-model-12-07-01-18


In [50]:
query = f'''
select bought,
       healthy_activity_last_2m,
       product_health_index,
       customer_health_index,
       product_category
from (
    select c.customer_health_index,
           cs.product_id,
           cs.healthy_activity_last_2m,
           cs.bought
    from "{click_stream_historical_table}" as cs
    left join "{customers_table}" as c
    on cs.customer_id = c.customer_id
) click_stream_customers
left join
(select * from "{products_table}") products
on click_stream_customers.product_id = products.product_id
'''

df_rank_features, query = query_offline_store(click_stream_feature_group_name, query,
                                              sagemaker_session)
df_rank_features.head()

INFO:sagemaker:Query e6cef6e3-8913-4e78-a186-5f0a084d87c3 is being executed.
INFO:sagemaker:Query e6cef6e3-8913-4e78-a186-5f0a084d87c3 is being executed.
INFO:sagemaker:Query e6cef6e3-8913-4e78-a186-5f0a084d87c3 successfully executed.


Unnamed: 0,bought,healthy_activity_last_2m,product_health_index,customer_health_index,product_category
0,0,1,0.9,0.380234,tea
1,0,1,0.9,0.380234,tea
2,1,7,0.7,0.25212,coffee
3,1,7,0.7,0.25212,coffee
4,0,0,0.9,0.004994,vitamins_supplements


In [51]:
df_rank_features = pd.concat([df_rank_features, pd.get_dummies(df_rank_features['product_category'], prefix='prod_cat')], axis=1)
del df_rank_features['product_category']
df_rank_features.head()

Unnamed: 0,bought,healthy_activity_last_2m,product_health_index,customer_health_index,prod_cat_baby_food_formula,prod_cat_baking_ingredients,prod_cat_candy_chocolate,prod_cat_chips_pretzels,prod_cat_cleaning_products,prod_cat_coffee,...,prod_cat_hair_care,prod_cat_ice_cream_ice,prod_cat_juice_nectars,prod_cat_packaged_cheese,prod_cat_refrigerated,prod_cat_soup_broth_bouillon,prod_cat_spices_seasonings,prod_cat_tea,prod_cat_vitamins_supplements,prod_cat_yogurt
0,0,1,0.9,0.380234,0,0,0,0,0,0,...,0,0,0,0,0,0,0,1,0,0
1,0,1,0.9,0.380234,0,0,0,0,0,0,...,0,0,0,0,0,0,0,1,0,0
2,1,7,0.7,0.25212,0,0,0,0,0,1,...,0,0,0,0,0,0,0,0,0,0
3,1,7,0.7,0.25212,0,0,0,0,0,1,...,0,0,0,0,0,0,0,0,0,0
4,0,0,0.9,0.004994,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,1,0


In [52]:
train_data, validation_data, _ = np.split(df_rank_features.sample(frac=1, random_state=1729), [int(0.7 * len(df_rank_features)), int(0.9 * len(df_rank_features))])
train_data.to_csv('train.csv', header=False, index=False)
validation_data.to_csv('validation.csv', header=False, index=False)
boto3.Session().resource('s3').Bucket(default_bucket).Object(os.path.join(prefix, 'train/train.csv')).upload_file('train.csv')
boto3.Session().resource('s3').Bucket(default_bucket).Object(os.path.join(prefix, 'validation/validation.csv')).upload_file('validation.csv')
s3_input_train = TrainingInput(s3_data='s3://{}/{}/train/train.csv'.format(default_bucket, prefix), content_type='csv')
s3_input_validation = TrainingInput(s3_data='s3://{}/{}/validation/validation.csv'.format(default_bucket, prefix), content_type='csv')

In [53]:
container = sagemaker.image_uris.retrieve('xgboost', region, version='1.2-2')

xgb = sagemaker.estimator.Estimator(container,
                                    role, 
                                    instance_count=1, 
                                    instance_type='ml.m4.xlarge',
                                    output_path='s3://{}/{}/output'.format(default_bucket, prefix),
                                    sagemaker_session=sagemaker_session)

xgb.set_hyperparameters(
    max_depth= 5,
    eta= 0.2,
    gamma= 4,
    min_child_weight= 6,
    subsample= 0.7,
    objective= 'binary:logistic',
    num_round= 50,
    verbosity= 2
)

xgb.fit({'train': s3_input_train, 'validation': s3_input_validation})

INFO:sagemaker.image_uris:Ignoring unnecessary instance type: None.
INFO:sagemaker:Creating training-job with name: sagemaker-xgboost-2023-12-07-01-34-43-910


2023-12-07 01:34:44 Starting - Starting the training job......
2023-12-07 01:35:19 Starting - Preparing the instances for training......
2023-12-07 01:36:44 Downloading - Downloading input data......
2023-12-07 01:37:15 Training - Downloading the training image........[34m[2023-12-07 01:38:56.213 ip-10-2-98-94.ec2.internal:7 INFO utils.py:27] RULE_JOB_STOP_SIGNAL_FILENAME: None[0m
[34m[2023-12-07:01:38:56:INFO] Imported framework sagemaker_xgboost_container.training[0m
[34m[2023-12-07:01:38:56:INFO] Failed to parse hyperparameter objective value binary:logistic to Json.[0m
[34mReturning the value itself[0m
[34m[2023-12-07:01:38:56:INFO] No GPUs detected (normal if no gpus installed)[0m
[34m[2023-12-07:01:38:56:INFO] Running XGBoost Sagemaker in algorithm mode[0m
[34m[2023-12-07:01:38:56:INFO] Determined delimiter of CSV input is ','[0m
[34m[2023-12-07:01:38:56:INFO] Determined delimiter of CSV input is ','[0m
[34m[2023-12-07:01:38:56:INFO] Determined delimiter of CSV i

In [54]:
xgb_predictor = xgb.deploy(
    endpoint_name = ranking_model_endpoint_name,
    initial_instance_count = 1,
    instance_type = 'ml.m4.xlarge',
    serializer = CSVSerializer(),
    wait=False
)

INFO:sagemaker:Creating model with name: sagemaker-xgboost-2023-12-07-01-40-12-968
INFO:sagemaker:Creating endpoint-config with name clickstreamrec-ranking-model-12-07-01-18
INFO:sagemaker:Creating endpoint with name clickstreamrec-ranking-model-12-07-01-18


Ranking model endpoint name: clickstreamrec-ranking-model-12-07-01-18


In [56]:
def top_rated_products_by_customer_state(customer_id, top_n):
    record = featurestore_runtime.get_record(FeatureGroupName=customers_feature_group_name,
                                             RecordIdentifierValueAsString=customer_id,
                                             FeatureNames=['state', 'is_married', 'age'])
    other_customer_features = {}
    for feature in record['Record']:
        other_customer_features[feature['FeatureName']] = feature['ValueAsString']
 
    state = other_customer_features['state']
    df_cf_features_by_state = df_cf_features[df_cf_features['state'] == state]
 
    popular_items = df_cf_features_by_state.groupby(["product_id", "product_name"])['rating'].agg('mean').sort_values(ascending=False).reset_index()
    for k, v in other_customer_features.items():
        popular_items[k] = v
    popular_items['customer_id'] = customer_id
    top_n_popular_items = popular_items.iloc[0:top_n]
    top_n_popular_items = top_n_popular_items[df_cf_features.columns]
    del top_n_popular_items['rating']
    return top_n_popular_items

In [65]:
featurestore_runtime = boto3.client(service_name='sagemaker-featurestore-runtime',
                                    region_name=region)
kinesis_analytics_application_name = f'fs-click-stream-application'
kinesis_stream_name = f'fs-click-stream-activity'
kinesis_analytics_application_name = f'fs-click-stream-application'
lambda_name = f'click-stream-aggregator-lambda'
n_range = 6

In [195]:
kinesis_client = boto3.client('kinesis')
kinesis_client.create_stream(StreamName=kinesis_stream_name, ShardCount=1)

active_stream = False
while not active_stream:
    status = kinesis_client.describe_stream(StreamName=kinesis_stream_name)['StreamDescription']['StreamStatus']
    if (status == 'CREATING'):
        print('Waiting for the Kinesis stream to become active...')
        time.sleep(20)  
    elif (status == 'ACTIVE'): 
        active_stream = True
        print('ACTIVE')

Waiting for the Kinesis stream to become active...
ACTIVE


In [198]:
kinesis_client = boto3.client('kinesis')
kinesis_output_stream_name = 'fs-out-stream'
kinesis_client.create_stream(StreamName=kinesis_output_stream_name, ShardCount=1)

active_stream = False
while not active_stream:
    status = kinesis_client.describe_stream(StreamName=kinesis_output_stream_name)['StreamDescription']['StreamStatus']
    if (status == 'CREATING'):
        print('Waiting for the Kinesis stream to become active...')
        time.sleep(20)  
    elif (status == 'ACTIVE'): 
        active_stream = True
        print('ACTIVE')
output_stream_arn = kinesis_client.describe_stream(StreamName=kinesis_output_stream_name)['StreamDescription']['StreamARN']
print(f'Amazon kinesis stream arn: {output_stream_arn}')

Waiting for the Kinesis stream to become active...
ACTIVE
Amazon kinesis stream arn: arn:aws:kinesis:us-east-1:205825199765:stream/fs-out-stream


In [196]:
stream_arn = kinesis_client.describe_stream(StreamName=kinesis_stream_name)['StreamDescription']['StreamARN']
print(f'Amazon kinesis stream arn: {stream_arn}')

Amazon kinesis stream arn: arn:aws:kinesis:us-east-1:205825199765:stream/fs-click-stream-activity


In [71]:
kda_client = boto3.client('kinesisanalytics')

In [154]:
sql_code = '''
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
  customer_id VARCHAR(8), 
  sum_activity_weight_last_2m INTEGER, 
  avg_product_health_index_last_2m DOUBLE
);
CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" 
SELECT 
  STREAM CUSTOMER_ID, 
  SUM(ACTIVITY_WEIGHT) AS sum_activity_weight_last_2m, 
  AVG(PRODUCT_HEALTH_INDEX) AS avg_product_health_index_last_2m
FROM 
  "SOURCE_SQL_STREAM_001" 
WINDOWED BY STAGGER (
    PARTITION BY CUSTOMER_ID RANGE INTERVAL \'2\' MINUTE);
'''

In [199]:
kda_input_schema = [{
                'NamePrefix': 'SOURCE_SQL_STREAM',
                'KinesisStreamsInput': {
                       'ResourceARN': stream_arn,
                       'RoleARN': role
                },
                'InputSchema': {
                      'RecordFormat': {
                          'RecordFormatType': 'JSON',
                          'MappingParameters': {
                              'JSONMappingParameters': {
                                  'RecordRowPath': '$'
                              }
                          },
                      },
                      'RecordEncoding': 'UTF-8',
                      'RecordColumns': [
                          {'Name': 'EVENT_TIME',  'Mapping': '$.event_time',   'SqlType': 'TIMESTAMP'},
                          {'Name': 'CUSTOMER_ID','Mapping': '$.customer_id', 'SqlType': 'VARCHAR(8)'},
                          {'Name': 'PRODUCT_ID', 'Mapping': '$.product_id', 'SqlType': 'VARCHAR(8)'},
                          {'Name': 'PRODUCT_CATEGORY', 'Mapping': '$.product_category', 'SqlType': 'VARCHAR(20)'},
                          {'Name': 'HEALTH_CATEGORY', 'Mapping': '$.health_category', 'SqlType': 'VARCHAR(10)'},
                          {'Name': 'ACTIVITY_TYPE', 'Mapping': '$.activity_type', 'SqlType': 'VARCHAR(10)'},
                          {'Name': 'ACTIVITY_WEIGHT', 'Mapping': '$.activity_weight', 'SqlType': 'INTEGER'},
                          {'Name': 'PRODUCT_HEALTH_INDEX', 'Mapping': '$.product_health_index', 'SqlType': 'DOUBLE'}
                      ]
                }
              }                         
            ]

In [78]:
lambda_function = Lambda(
    function_name=lambda_name,
    execution_role_arn=role,
    script="lambda_stream.py",
    handler="lambda_stream.lambda_handler",
    timeout=600,
    memory_size=10240,
)

lambda_function_response = lambda_function.create()
lambda_function_arn = lambda_function_response['FunctionArn']

print(f'Lambda function arn: {lambda_function_arn}')

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml
Lambda function arn: arn:aws:lambda:us-east-1:205825199765:function:click-stream-aggregator-lambda


In [79]:
lambda_client = boto3.client('lambda')
lambda_client.update_function_configuration(FunctionName=lambda_name,
                                            Environment={
                                                'Variables': {
                                                    'click_stream_feature_group_name': click_stream_feature_group_name
                                                }
                                            })

{'ResponseMetadata': {'RequestId': '841c9e1b-e41b-4765-9c0a-cb699c120f47',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'date': 'Thu, 07 Dec 2023 03:08:36 GMT',
   'content-type': 'application/json',
   'content-length': '1529',
   'connection': 'keep-alive',
   'x-amzn-requestid': '841c9e1b-e41b-4765-9c0a-cb699c120f47'},
  'RetryAttempts': 0},
 'FunctionName': 'click-stream-aggregator-lambda',
 'FunctionArn': 'arn:aws:lambda:us-east-1:205825199765:function:click-stream-aggregator-lambda',
 'Runtime': 'python3.8',
 'Role': 'arn:aws:iam::205825199765:role/service-role/AmazonSageMaker-ExecutionRole-20230914T164936',
 'Handler': 'lambda_stream.lambda_handler',
 'CodeSize': 2542,
 'Description': '',
 'Timeout': 600,
 'MemorySize': 10240,
 'LastModified': '2023-12-07T03:08:36.000+0000',
 'CodeSha256': '7IYASAR3aHQm3lfO+5GmWJP9rdZ9XYYK4IB93aNY6Fs=',
 'Version': '$LATEST',
 'Environment': {'Variables': {'click_stream_feature_group_name': 'clickstreamrec-click-stream-fg-12-07-00-51'}},
 'Tracing

In [200]:
kda_output_schema = [{'KinesisStreamsOutput': {
                       'ResourceARN': output_stream_arn,
                       'RoleARN': role
                },
                      'Name': 'DESTINATION_SQL_STREAM',
                      'DestinationSchema': {'RecordFormatType': 'JSON'}}]
print(f'KDA output schema: {kda_output_schema}')

KDA output schema: [{'KinesisStreamsOutput': {'ResourceARN': 'arn:aws:kinesis:us-east-1:205825199765:stream/fs-out-stream', 'RoleARN': 'arn:aws:iam::205825199765:role/service-role/AmazonSageMaker-ExecutionRole-20230914T164936'}, 'Name': 'DESTINATION_SQL_STREAM', 'DestinationSchema': {'RecordFormatType': 'JSON'}}]


In [201]:
is_app_created = False
while not is_app_created:
    response = kda_client.create_application(ApplicationName=kinesis_analytics_application_name, 
                              Inputs=kda_input_schema,
                              Outputs=kda_output_schema,
                              ApplicationCode=sql_code)
    status = response['ApplicationSummary']['ApplicationStatus']
    if (status != 'READY'):
        print('Waiting on Kinesis Analytics Application to be READY')
        time.sleep(20)  
    elif (status == 'READY'): 
        is_app_created = True
        print('Kinesis Analytics Application created')

Kinesis Analytics Application created


In [202]:
kda_client.start_application(ApplicationName=kinesis_analytics_application_name,
                             InputConfigurations=[{'Id': '1.1',
                                                   'InputStartingPositionConfiguration': 
                                                     {'InputStartingPosition':'NOW'}}])

{'ResponseMetadata': {'RequestId': '5a24a7c2-8533-42b2-9698-82df68478698',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '5a24a7c2-8533-42b2-9698-82df68478698',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '2',
   'date': 'Fri, 08 Dec 2023 05:09:56 GMT'},
  'RetryAttempts': 0}}

In [203]:
running_app = False
while not running_app:
    status = kda_client.describe_application(ApplicationName=kinesis_analytics_application_name)['ApplicationDetail']['ApplicationStatus']
    if (status != 'RUNNING'):
        print('Waiting for the Kinesis Application to be in RUNNING state...')
        time.sleep(20) 
    elif (status == 'RUNNING'): 
        running_app = True
        print('RUNNING')

Waiting for the Kinesis Application to be in RUNNING state...
RUNNING
