# Implementing Amazon Personalize

In this notebook, we'll create a dataset within Amazon Personalize and import relevant data sources, including user interactions and product metadata. We'll then choose an appropriate recipe based on our recommendation goals, such as user-personalization. After creating a solution and training the model using the selected recipe and dataset,  we'll leverage the deployed solution to produce personalized product recommendations tailored to individual users.

In [None]:
import time
from time import sleep
import json
from datetime import datetime
import boto3
import pandas as pd
from io import StringIO
%store -r


***Creating a Dataset Group for Personalize***

Configure the AWS SDK to interact with Amazon Personalize, then creates a new dataset group named "personalize-product-recommendations." Capture the ARN of the created dataset group. Sets a maximum runtime of 3 hours and continuously polls the status of the dataset group creation in Amazon Personalize. Check if the status of the dataset group is "ACTIVE" or "CREATE FAILED" every minute

In [None]:
#Create Dataset

# Configure the SDK to Personalize:
personalize = boto3.client('personalize')
personalize_runtime = boto3.client('personalize-runtime')

create_dataset_group_response = personalize.create_dataset_group(
    name = "personalize-product-recommendations"
)

dataset_group_arn = create_dataset_group_response['datasetGroupArn']
print(json.dumps(create_dataset_group_response, indent=2))



In [None]:
%%time
max_time = time.time() + 3*60*60 # 3 hours
while time.time() < max_time:
    describe_dataset_group_response = personalize.describe_dataset_group(
        datasetGroupArn = dataset_group_arn
    )
    status = describe_dataset_group_response["datasetGroup"]["status"]
    print("DatasetGroup: {}".format(status))
    
    if status == "ACTIVE" or status == "CREATE FAILED":
        break
        
    time.sleep(60)

# Creating Schema and Datasets

***Creating Interactions Schema and Dataset***

Define an interactions schema for Amazon Personalize, specifying fields for TIMESTAMP, USER_ID, ITEM_ID, and ITEM_NAME. Create the schema in Personalize and retrieves its ARN. Next, we must create an interactions dataset using this schema and associates it with the previously created dataset group

In [None]:
#create interactions schema
interactions_schema = {
    "type": "record",
    "name": "Interactions",
    "namespace": "com.amazonaws.personalize.schema",
    "fields": [
        {
            "name": "TIMESTAMP",
            "type": "long"
        },
        {
            "name": "USERNAME",
            "type": "string"
        },
        {
            "name": "USER_ID",
            "type": "string"
        },
        {
            "name": "ITEM_ID",
            "type": "string"
        },
        {
            "name": "PRODUCT_NAME",
            "type": "string"
        },
        {
          "name": "SENTIMENT",
          "type": "string"
        }
        
    ],
    "version": "1.0"
}



create_schema_response = personalize.create_schema(
    name = "personalize-product-recommendations-interactions3786",
    schema = json.dumps(interactions_schema)
)

interaction_schema_arn = create_schema_response['schemaArn']
print(json.dumps(create_schema_response, indent=2))

dataset_type = "INTERACTIONS"
create_dataset_response = personalize.create_dataset(
    name = "personalize-product-recommendations-ints-dataset",
    datasetType = dataset_type,
    datasetGroupArn = dataset_group_arn,
    schemaArn = interaction_schema_arn
)

interactions_dataset_arn = create_dataset_response['datasetArn']
print(json.dumps(create_dataset_response, indent=2))


***Creating Users Schema and Dataset***

Define a schema for user data in Amazon Personalize, specifying fields such as TIMESTAMP, product_id, product_name, rating, AGE, USER_ID, user_name, review_id, and review_title. Next, we must create a users dataset using the defined schema and associate it with the existing dataset group

In [None]:
# create user schema
users_schema = {
    "type": "record",
    "name": "Users",
    "namespace": "com.amazonaws.personalize.schema",
    "fields": [
        {
            "name": "TIMESTAMP",
            "type": "long"
        },
        {
            "name": "ITEM_ID",
            "type": "string"
        },
        {
            "name": "PRODUCT_NAME",
            "type": "string"
        },
        {
            "name": "RATING",
            "type": "string"
        },
        {
            "name": "AGE",
            "type": "int"
        },
        {
            "name": "USER_ID",
            "type": "string"
        },
        {
            "name": "USERNAME",
            "type": "string"
        },
        {
            "name": "REVIEW_ID",
            "type": "string"
        },
        {
            "name": "REVIEW_TITLE",
            "type": "string"
        }
    ],
    "version": "1.0"
}


create_users_schema_response = personalize.create_schema(
    name='personalize-user-recommendations-users',
    schema=json.dumps(users_schema)
)

users_schema_arn = create_users_schema_response['schemaArn']
print(json.dumps(create_users_schema_response, indent=2))



dataset_type = "USERS"
create_dataset_response = personalize.create_dataset(
    name = "personalize-product-recommendations-users-dataset",
    datasetType = dataset_type,
    datasetGroupArn = dataset_group_arn,
    schemaArn = users_schema_arn
)


users_dataset_arn = create_dataset_response['datasetArn']
print(json.dumps(create_dataset_response, indent=2))

***Creating Items Schema and Dataset***

Define a schema for item data in Amazon Personalize, specifying fields such as CREATION_TIMESTAMP, ITEM_ID, product_name, category, rating_count, and DESCRIPTION. Next, an items dataset is created using the defined schema and associated with the existing dataset group.

In [None]:
items_schema = {
    "type": "record",
    "name": "Items",
    "namespace": "com.amazonaws.personalize.schema",
    "fields": [
        {
            "name": "CREATION_TIMESTAMP",
            "type": "long"
        },
        {
            "name": "ITEM_ID",
            "type": "string"
        },
        {
            "name": "PRODUCT_NAME",
            "type": ["null", "string"]
        },
        {
            "name": "CATEGORY",
            "type": ["null", "string"],
            "categorical": True
        },
        {
            "name": "RATING_COUNT",
            "type": ["null", "string"]
        },
        {
            "name": "DESCRIPTION",
            "type": ["null", "string"],
            "textual": True
        }
    ],
    "version": "1.0"
}



create_schema_response = personalize.create_schema(
    name="personalize-product-recommendations-items",
    schema=json.dumps(items_schema)
)

items_schema_arn = create_schema_response['schemaArn']
print(json.dumps(create_schema_response, indent=2))

dataset_type = "ITEMS"
create_dataset_response = personalize.create_dataset(
    name="personalize-product-recommendations-items-dataset",
    datasetType=dataset_type,
    datasetGroupArn=dataset_group_arn,
    schemaArn=items_schema_arn
)

items_dataset_arn = create_dataset_response['datasetArn']
print(json.dumps(create_dataset_response, indent=2))



## Implementation of IAM Roles and Policies 

***Configure S3 bucket and IAM role for Personalize***

Allow AWS Personalize to have access to the data stored in the initial S3 bucket. Next,create an IAM role which defines the trust policy when using Personalize as a service.

In [None]:
s3 = boto3.client('s3')

policy = {
    "Version": "2012-10-17",
    "Id": "PersonalizeS3BucketAccessPolicy",
    "Statement": [
        {
            "Sid": "PersonalizeS3BucketAccessPolicy",
            "Effect": "Allow",
            "Principal": {
                "Service": "personalize.amazonaws.com"
            },
            "Action": [
                "s3:*Object",
                "s3:ListBucket"
            ],
            "Resource": [
                "arn:aws:s3:::{}".format(bucket_name),
                "arn:aws:s3:::{}/*".format(bucket_name)
            ]
        }
    ]
}


s3.put_bucket_policy(Bucket=bucket_name, Policy=json.dumps(policy))

In [None]:
iam = boto3.client("iam")

role_name = "PersonalizeRolePOC"
assume_role_policy_document = {
    "Version": "2012-10-17",
    "Statement": [
        {
          "Effect": "Allow",
          "Principal": {
            "Service": "personalize.amazonaws.com"
          },
          "Action": "sts:AssumeRole"
        }
    ]
}

create_role_response = iam.create_role(
    RoleName = role_name,
    AssumeRolePolicyDocument = json.dumps(assume_role_policy_document)
)

# AmazonPersonalizeFullAccess provides access to any S3 bucket with a name that includes "personalize" or "Personalize" 
# if you would like to use a bucket with a different name, please consider creating and attaching a new policy
# that provides read access to your bucket or attaching the AmazonS3ReadOnlyAccess policy to the role
policy_arn = "arn:aws:iam::aws:policy/service-role/AmazonPersonalizeFullAccess"
iam.attach_role_policy(
    RoleName = role_name,
    PolicyArn = policy_arn
)

# Now add S3 support
iam.attach_role_policy(
    PolicyArn='arn:aws:iam::aws:policy/AmazonS3FullAccess',
    RoleName=role_name
)
time.sleep(60) # wait for a minute to allow IAM role policy attachment to propagate

role_arn = create_role_response["Role"]["Arn"]
print(role_arn)

## Importing Datasets into Personalize

Perform dataset import jobs for the interactions dataset, the user dataset, and the item dataset. Each import job specifies the dataset ARN, the location of the data in an S3 bucket, and the IAM role ARN that grants Personalize access to the data.

In [None]:
# interaction dataset import


create_dataset_import_job_response = personalize.create_dataset_import_job(
    jobName = "personalize-product-import",
    datasetArn = interactions_dataset_arn,
    dataSource = {
        "dataLocation": "s3://{}/{}".format(bucket_name, interactions_filename)
    },
    roleArn = role_arn
)

dataset_import_job_arn = create_dataset_import_job_response['datasetImportJobArn']
print(json.dumps(create_dataset_import_job_response, indent=2))

In [None]:
#user dataset import

user_filename = 'user_data.csv'

create_user_dataset_import_job_response = personalize.create_dataset_import_job(
    jobName="personalize-user-import",
    datasetArn=users_dataset_arn,
    dataSource={
        "dataLocation": f"s3://{bucket_name}/{user_filename}"
    },
    roleArn=role_arn
)

user_dataset_import_job_arn = create_user_dataset_import_job_response['datasetImportJobArn']
print(json.dumps(create_user_dataset_import_job_response, indent=2))



In [None]:
# item dataset import


item_filename = 'product_data.csv'

create_items_dataset_import_job_response = personalize.create_dataset_import_job(
    jobName="personalize-items-import",
    datasetArn=items_dataset_arn,
    dataSource={
        "dataLocation": f"s3://{bucket_name}/{item_filename}"
    },
    roleArn=role_arn
)

items_dataset_import_job_arn = create_items_dataset_import_job_response['datasetImportJobArn']
print(json.dumps(create_items_dataset_import_job_response, indent=2))



***Monitoring Dataset Import in Amazon Personalize***

We monitor the progress of the item,user,and interaction dataset import job in Amazon Personalize. Set a maximum wait time of 6 hours for the job to complete. The code then enters a loop that checks the status of the dataset import job every minute using the describe_dataset_import_job API call. If the status is "ACTIVE", indicating that the job has completed successfully, the loop is broken.


In [None]:
# creating interaction datajob

max_time = time.time() + 6*60*60 # 6 hours
while time.time() < max_time:
    describe_dataset_import_job_response = personalize.describe_dataset_import_job(
        datasetImportJobArn = dataset_import_job_arn
    )
    status = describe_dataset_import_job_response["datasetImportJob"]['status']
    print("DatasetImportJob: {}".format(status))
    
    if status == "ACTIVE":
        print("Dataset import job completed successfully.")
        break
    elif status == "CREATE FAILED":
        failure_reason = describe_dataset_import_job_response["datasetImportJob"]['failureReason']
        print(f"Dataset import job failed: {failure_reason}")
        break
        
    time.sleep(60)

In [None]:
#create user datajob

max_time = time.time() + 6 * 60 * 60  # 6 hours max wait time
while time.time() < max_time:
    describe_dataset_import_job_response = personalize.describe_dataset_import_job(
        datasetImportJobArn=user_dataset_import_job_arn
    )
    status = describe_dataset_import_job_response["datasetImportJob"]['status']
    print(f"DatasetImportJob: {status}")
    
    if status == "ACTIVE":
        print("Dataset import job completed successfully.")
        break
    elif status == "CREATE FAILED":
        failure_reason = describe_dataset_import_job_response["datasetImportJob"]['failureReason']
        print(f"Dataset import job failed: {failure_reason}")
        break
    
    time.sleep(60)

In [None]:
# create item datajob


max_time = time.time() + 6 * 60 * 60  # 6 hours max wait time
while time.time() < max_time:
    describe_dataset_import_job_response = personalize.describe_dataset_import_job(
        datasetImportJobArn=items_dataset_import_job_arn
    )
    status = describe_dataset_import_job_response["datasetImportJob"]['status']
    print(f"DatasetImportJob: {status}")
    
    if status == "ACTIVE":
        print("Dataset import job completed successfully.")
        break
    elif status == "CREATE FAILED":
        failure_reason = describe_dataset_import_job_response["datasetImportJob"]['failureReason']
        print(f"Dataset import job failed: {failure_reason}")
        break
    
    time.sleep(60)

***Creating Solution and Solution Version***

We will be using the "aws-user-personalization-v2" recipe which is designed for user personalization tasks. It is a powerful tool for creating personalized recommendations for users based on their interactions with items in a dataset. The recipe uses the User-Personalized Ranking (UPR) algorithm, which is a variation of the Weighted Matrix Factorization (WMF) algorithm. The UPR algorithm is optimized for generating personalized recommendations by focusing on the relative preferences of users rather than their absolute preferences. After choosing the recipe, we must create a solution which is associated to our previous dataset group. The solution may take up to an **_hour_** to generate.

In [None]:
recipe_arn = "arn:aws:personalize:::recipe/aws-user-personalization-v2"

In [None]:
create_solution_response = personalize.create_solution(
    name = "personalize-soln-user-personalization",
    datasetGroupArn = dataset_group_arn,
    recipeArn = recipe_arn
)

solution_arn = create_solution_response['solutionArn']
print(json.dumps(create_solution_response, indent=2))


In [None]:
create_solution_version_response = personalize.create_solution_version(
    solutionArn = solution_arn
)

solution_version_arn = create_solution_version_response['solutionVersionArn']
print(json.dumps(create_solution_version_response, indent=2))

In [None]:
%%time
max_time = time.time() + 3*60*60 # 3 hours
while time.time() < max_time:
    describe_solution_version_response = personalize.describe_solution_version(
        solutionVersionArn = solution_version_arn
    )
    status = describe_solution_version_response["solutionVersion"]["status"]
    print("SolutionVersion: {}".format(status))
    
    if status == "ACTIVE" or status == "CREATE FAILED":
        break
        
    time.sleep(60)

***Creating and Monitoring Personalize Campaign***

We create a campaign in order to serve real-time personalized recommendations to users. The campaign ARN returned by the create_campaign API can be used to invoke the Personalize Runtime API to get recommendations for a specific user. We can also configure the service settings to set a minimum provisioned transactions per second (TPS) for the campaign, which determines the minimum level of traffic the campaign can handle. We also configure the item exploration settings, which can be used to balance exploration of new items versus exploitation of known user preferences. The campaign may take up to an **_15 minutes_** to generate.

In [None]:
create_campaign_response = personalize.create_campaign(
    name="personalize-recs1",
    solutionVersionArn=solution_version_arn,
    minProvisionedTPS=1
)

campaign_arn = create_campaign_response['campaignArn']
print(json.dumps(create_campaign_response, indent=2))

max_time = time.time() + 3*60*60 # 3 hours
while time.time() < max_time:
    describe_campaign_response = personalize.describe_campaign(
        campaignArn=campaign_arn
    )
    status = describe_campaign_response["campaign"]["status"]
    print("Campaign: {}".format(status))
    
    if status == "ACTIVE" or status == "CREATE FAILED":
        break
        
    time.sleep(60)


## Generating and Saving Personalize Recommendations

We first load the interactions data and creates a mapping of item IDs to item names. Next, we use the get_recommendations API from the Personalize Runtime to retrieve personalized recommendations for the selected user, based on the campaign created earlier. The recommended item IDs are then mapped to their corresponding item names using the previously created mapping.Lastly, we iterate through all unique user IDs in the interactions data, retrieving personalized recommendations for each user and storing them in a list of dictionaries.


In [None]:
items_map = dict(zip(interactions_df['ITEM_ID'].astype(str), interactions_df['PRODUCT_NAME']))
user_mapping = interactions_df.set_index('USER_ID')['USERNAME'].to_dict()
all_recommendations = []

# Group interactions by USER_ID
grouped = interactions_df.groupby('USER_ID')

for user_id, group in grouped:
    get_recommendations_response = personalize_runtime.get_recommendations(
        campaignArn=campaign_arn,
        userId=str(user_id)
    )
    
    recommendation_list = [item['itemId'] for item in get_recommendations_response['itemList']]
    recommendation_names = [items_map.get(item_id, 'Unknown') for item_id in recommendation_list]
    latest_timestamp_value = group['TIMESTAMP'].max()
    username = user_mapping.get(user_id, 'Unknown')
    
    # Append the recommendation details for this user
    all_recommendations.append({
        'USER_ID': user_id,
        'Username': username,
        'Recommended_Items': recommendation_names[:5],  # Limit to top 5 recommendations
        'TIMESTAMP': latest_timestamp_value
    })

# Convert to DataFrame
recommendations_df = pd.DataFrame(all_recommendations)

# Save to CSV
csv_filename = 'user_recommendations.csv'
recommendations_df.to_csv(csv_filename, index=False)
print(f"Recommendations DataFrame saved to {csv_filename}")

print(recommendations_df)

Generate a random row from the dataframe to cross match users and their 5 recommendations.

In [None]:
random_row = recommendations_df.sample(n=1).iloc[0]

user_id = random_row['USER_ID']
username = random_row['Username']
recommended_items = random_row['Recommended_Items']

print(f"Randomly Selected User ID: {user_id}")
print(f"Username: {username}")
print("Recommended Items:")
for item in recommended_items:
    print(f" - {item}")

***Storing Personalized Recommendations in Amazon DynamoDB***

We create DynamoDB table named if it doesn't already exist, and then insert the personalized recommendations generated in the previous step into the table. The recommendations are stored with the user ID as the hash key and as a dictionary, allowing for efficient retrieval of recommendations for specific users.

In [None]:
dynamodb = boto3.resource('dynamodb')
table_name = 'user_recommendations'

# Create the DynamoDB table if it doesn't exist
try:
    table = dynamodb.create_table(
        TableName=table_name,
        KeySchema=[
            {
                'AttributeName': 'USER_ID',
                'KeyType': 'HASH'  # Partition key
            }
        ],
        AttributeDefinitions=[
            {
                'AttributeName': 'USER_ID',
                'AttributeType': 'S'  # String type for USER_ID
            }
        ],
        BillingMode='PAY_PER_REQUEST'
    )
    print(f"Table '{table_name}' created. Status: {table.table_status}")
except ClientError as e:
    if e.response['Error']['Code'] != 'ResourceInUseException':
        print(e)
    else:
        table = dynamodb.Table(table_name)
        print(f"Table '{table_name}' already exists.")

# Wait until the table is active
table.wait_until_exists()

# Group recommendations by USER_ID
grouped_recommendations = recommendations_df.groupby('USER_ID')

# Write the DataFrame to DynamoDB table
for user_id, user_recommendations in grouped_recommendations:
    with table.batch_writer() as batch:
        for index, row in user_recommendations.iterrows():
            batch.put_item(Item={
                'USER_ID': str(row['USER_ID']),
                'Username': row['Username'],
                'Recommended_Items': row['Recommended_Items'],
                'TIMESTAMP': str(row['TIMESTAMP'])
            })

print(f"Recommendations written to DynamoDB table: {table_name}")

***Store variables for future use***

In [None]:
%store bucket_name
%store recommendations_df
%store dataset_group_arn
%store interaction_schema_arn
%store users_schema_arn
%store items_schema_arn
%store users_dataset_arn
%store items_dataset_arn
%store interactions_dataset_arn
%store role_name
%store role_arn
%store interactions_df
%store user_filename
%store solution_version_arn
%store solution_arn
%store campaign_arn