# Amazon Personalize Movie Recommendation

# Import required Python modules and define variables

We're going to define an Amazon S3 Bucket name to store the sample dataset we're going to import as well as names (variables) for the dataset and Amazon Personalize Campaign

In [None]:
import json
from datetime import datetime
import time
import pytz
import boto3
import pandas as pd
import numpy as np
from sklearn.utils import shuffle

alias = "<yourname>-pinpoint-workshop" # replace <yourname>

bucket = "sagemaker-personalize-{}".format(alias)
dataset_name = "{}-dataset".format(alias)
schema_name = "{}-schema".format(alias)
import_job_name = "{}-import".format(alias)
campaign_name = "{}-campaign".format(alias)
personalize_role_name = "{}-role".format(alias)
solution_name = "{}-solution".format(alias)

Next, the sample movie dataset is being downloaded, extracted and read by the Python Pandas Library as a CSV input. The first five entries are printed out.

In [None]:
!wget -N http://files.grouplens.org/datasets/movielens/ml-100k.zip
!unzip -o ml-100k.zip
data = pd.read_csv('./ml-100k/u.data', sep='\t', names=['USER_ID', 'ITEM_ID', 'RATING', 'TIMESTAMP'])
pd.set_option('display.max_rows', 5)
data

Register the Python AWS SDK (boto3) for Amazon S3 and create a bucket.

In [None]:
s3client = boto3.client('s3')
bucket_response = s3client.create_bucket(
    ACL='private',
    Bucket=bucket,
    CreateBucketConfiguration={
        'LocationConstraint': 'us-west-2'
    }
)
print(bucket_response["ResponseMetadata"])

In [None]:
filename = "ratings.processed.csv"              # file in S3 that will hold our model training data
data = data[data['RATING'] > 3.6]                  # keep only movies rated 3.6 and above
data = data[['USER_ID', 'ITEM_ID', 'TIMESTAMP']]   # select columns that match the columns in the schema below
data['TIMESTAMP'] = data['TIMESTAMP'] + 660833618  # make reviews end 1st April 2019 rather than 23rd April 1998
data.to_csv(filename, index=False)

boto3.Session().resource('s3').Bucket(bucket).Object(filename).upload_file(filename)

Amazon Personalize is being initialized with boto3 to create the dataset, import the data and interact with Amazon Personalize in general

In [None]:
personalize_client = boto3.client('personalize')
personalize_runtime = boto3.client('personalize-runtime')

dataset_group_resp = personalize_client.create_dataset_group(
    name=dataset_name
)
dataset_group = dataset_group_resp['datasetGroupArn']

We'll define a Schema according to the data we processed in the steps above

In [None]:
schema = {
    "type": "record",
    "name": "Interactions",
    "namespace": "com.amazonaws.personalize.schema",
    "fields": [
        {
            "name": "USER_ID",
            "type": "string"
        },
        {
            "name": "ITEM_ID",
            "type": "string"
        },
        {
            "name": "TIMESTAMP",
            "type": "long"
        }
    ],
    "version": "1.0"
}

schema_resp = personalize_client.create_schema(
    name=schema_name,
    schema=json.dumps(schema)
)
schema_arn = schema_resp['schemaArn']
time.sleep(30)

Create the Dataset using the previously created and defined schema.

In [None]:
dataset_resp = personalize_client.create_dataset(
    name=dataset_name,
    schemaArn=schema_arn,
    datasetGroupArn=dataset_group,
    datasetType='INTERACTIONS'
)
dataset = dataset_resp['datasetArn']

Attach an IAM role to the created S3 bucket so we can read and write data from Personalize

In [None]:
policy = {
    "Version": "2012-10-17",
    "Id": "PersonalizeS3BucketAccessPolicy",
    "Statement": [
        {
            "Sid": "PersonalizeS3BucketAccessPolicy",
            "Effect": "Allow",
            "Principal": {
                "Service": "personalize.amazonaws.com"
            },
            "Action": [
                "s3:GetObject",
                "s3:ListBucket"
            ],
            "Resource": [
                "arn:aws:s3:::{}".format(bucket),
                "arn:aws:s3:::{}/*".format(bucket)
            ]
        }
    ]
}

s3client.put_bucket_policy(Bucket=bucket, Policy=json.dumps(policy));

Define an IAM role for the Personalize import job of the dataset

In [None]:
iam = boto3.client("iam")

role_name = personalize_role_name
assume_role_policy_document = {
    "Version": "2012-10-17",
    "Statement": [
        {
          "Effect": "Allow",
          "Principal": {
            "Service": "personalize.amazonaws.com"
          },
          "Action": "sts:AssumeRole"
        }
    ]
}

create_role_response = iam.create_role(
    RoleName = role_name,
    AssumeRolePolicyDocument = json.dumps(assume_role_policy_document)
)

# AmazonPersonalizeFullAccess provides access to any S3 bucket with a name that includes
# "personalize" or "Personalize".  If you would like to use a bucket with a different name,
# please consider creating and attaching a new policy that provides read access to your
# bucket or attaching the AmazonS3ReadOnlyAccess policy to this role
policy_arn = "arn:aws:iam::aws:policy/service-role/AmazonPersonalizeFullAccess"
iam.attach_role_policy(
    RoleName = role_name,
    PolicyArn = policy_arn
)

role_arn = create_role_response["Role"]["Arn"]
print(role_arn)
time.sleep(10)

Run the actual import Job of the dataset into Amazon Personalize from Amazon S3 with the processed data.

In [None]:
dataset_import_resp = personalize_client.create_dataset_import_job(
    jobName=import_job_name,
    datasetArn=dataset,
    dataSource={
        'dataLocation': 's3://{}/ratings.processed.csv'.format(bucket)
    },
    roleArn=role_arn
)
dataset_import_arn = dataset_import_resp['datasetImportJobArn']

Wait until the dataset import job is done, this can take a while depending on the dataset size. The output will refresh every minute with the current status until it's finished.

In [None]:
status = None
max_time = time.time() + 3*60*60 # 3 hours
while time.time() < max_time:
    describe_dataset_import_job_response = personalize_client.describe_dataset_import_job(
        datasetImportJobArn = dataset_import_arn
    )
    
    dataset_import_job = describe_dataset_import_job_response["datasetImportJob"]
    now = datetime.now(pytz.utc)
    elapsed = now - dataset_import_job["creationDateTime"]
    if "latestDatasetImportJobRun" not in dataset_import_job:
        status = dataset_import_job["status"]
        print("DatasetImportJob: {}   (elapsed = {})".format(status, elapsed))
    else:
        status = dataset_import_job["latestDatasetImportJobRun"]["status"]
        print("LatestDatasetImportJobRun: {}".format(status))
    
    if status == "ACTIVE" or status == "CREATE FAILED":
        break
        
    time.sleep(60)

Provide a list of pre-defined Amazon Personalize recipes. A recipe is a machine learning algorithm or algorithm variant that you use with settings, or hyperparameters, and a dataset group to train an Amazon Personalize model. With recipes, you can create a personalization system without prior machine learning experience. You can find more information [here](https://docs.aws.amazon.com/personalize/latest/dg/working-with-predefined-recipes.html) - we'll provide a list and select the first one: `aws-hrnn` for our use-case.

In [None]:
recipe_list = [
    "arn:aws:personalize:::recipe/aws-hrnn",
    "arn:aws:personalize:::recipe/aws-hrnn-coldstart",
    "arn:aws:personalize:::recipe/aws-hrnn-metadata",
    "arn:aws:personalize:::recipe/aws-personalized-ranking",
    "arn:aws:personalize:::recipe/aws-popularity-count",
    "arn:aws:personalize:::recipe/aws-sims"
]

recipe_arn = recipe_list[0]

Next, we'll create a Amazon Personalize Solution with our dataset and recipe.

In [None]:
create_solution_response = personalize_client.create_solution(
    name = solution_name,
    datasetGroupArn = dataset_group,
    recipeArn = recipe_arn
)

solution_arn = create_solution_response['solutionArn']
print(json.dumps(create_solution_response, indent=2))

Each solution requires a solution version, this process can take a while again, the output will refresh every minute and prints out the current status.

In [None]:
create_solution_version_response = personalize_client.create_solution_version(
    solutionArn = solution_arn
)

solution_version_arn = create_solution_version_response['solutionVersionArn']
print(json.dumps(create_solution_version_response, indent=2))

In [None]:
status = None
max_time = time.time() + 3*60*60 # 3 hours
while time.time() < max_time:
    describe_solution_version_response = personalize_client.describe_solution_version(
        solutionVersionArn = solution_version_arn
    )
    status = describe_solution_version_response["solutionVersion"]["status"]
    now = datetime.now(pytz.utc)
    elapsed = now - describe_solution_version_response["solutionVersion"]["creationDateTime"]
    print("SolutionVersion: {}   (elapsed = {})".format(status, elapsed))
    
    if status == "ACTIVE" or status == "CREATE FAILED":
        break
        
    time.sleep(60)

Amazon Personalize generates a number of metrics when it creates a solution version. These metrics allow you to evaluate the performance of the solution version before you create a campaign and provide recommendations. Metrics allow you to view the effects of modifying a solution's hyperparameters. You can also compare the metrics between solutions that use the same training data but created with different recipes. 

In [None]:
get_solution_metrics_response = personalize_client.get_solution_metrics(
    solutionVersionArn = solution_version_arn
)

print(json.dumps(get_solution_metrics_response, indent=2))

Now we create an Amazon Personalize Campaign based on the dataset and solution version created earlier. This campaign is used to provide recommendations.

In [None]:
create_campaign_response = personalize_client.create_campaign(
    name = campaign_name,
    solutionVersionArn = solution_version_arn,
    minProvisionedTPS = 10
)

campaign_arn = create_campaign_response['campaignArn']
print(json.dumps(create_campaign_response, indent=2))

This process will also run for a while, the current status is updated every 30 seconds until finished.

In [None]:
status = None
max_time = time.time() + 3*60*60 # 3 hours
while time.time() < max_time:
    describe_campaign_response = personalize_client.describe_campaign(
        campaignArn = campaign_arn
    )
    status = describe_campaign_response["campaign"]["status"]
    now = datetime.now(pytz.utc)
    elapsed = now - describe_campaign_response["campaign"]["creationDateTime"]
    print("Campaign: {}   (elapsed = {})".format(status, elapsed))
    
    if status == "ACTIVE" or status == "CREATE FAILED":
        break
        
    time.sleep(30)

Now, since the campaign is available for use and provide recommendations, we can get recommendations from Personalize.

In [None]:
personalize_runtime = boto3.client('personalize-runtime')
get_recommendations_response = personalize_runtime.get_recommendations(
    campaignArn = campaign_arn,
    userId = str(user_id)
)

item_list = get_recommendations_response['itemList']
print("Recommendations: {}".format(json.dumps(item_list, indent=2)))

In order to use Amazon Personalize with Amazon Amplify together we need a tracking ID for our Amplify project, you'll get the tracking ID after running this command:

In [None]:
event_tracker_response = personalize_client.create_event_tracker(
    name = "{}-eventTracker".format(alias),
    datasetGroupArn=dataset_group
)

event_tracker_arn = event_tracker_response['eventTrackerArn']
event_tracking_id = event_tracker_response['trackingId']
print(event_tracker_arn)
print("Tracking ID for your Amplify project: {}".format(event_tracking_id))

In [None]:
items = pd.read_csv('./u.item', sep='\t', usecols=[0,1], header=None)
items.columns = ['ITEM_ID', 'TITLE']

user_id, item_id, _ = data.sample().values[0]
item_title = items.loc[items['ITEM_ID'] == item_id].values[0][-1]
print("USER: {}".format(user_id))
print("ITEM: {}".format(item_title))
print("ITEM ID: {}".format(item_id))

print(items.loc[items['ITEM_ID'] == item_id])

items

In [None]:
response = personalize_runtime.get_recommendations(
    campaignArn = campaign_arn,
    userId = '402')

print("Recommended items")
for item in response['itemList']:
    print(item['itemId'])
    
get_recommendations_response = personalize_runtime.get_recommendations(
    campaignArn = campaign_arn,
#    userId = str(user_id)
    userId = str(402)
)

item_list = get_recommendations_response['itemList']
title_list = [items.loc[items['ITEM_ID'] == np.int(item['itemId'])].values[0][-1] for item in item_list]

print("Recommendations: {}".format(json.dumps(title_list, indent=2)))

In [None]:
print(campaign_arn)