In [1]:
import boto3

import json
import numpy as np
import pandas as pd
import time

personalize = boto3.client(service_name='personalize')
personalize_runtime = boto3.client(service_name='personalize-runtime')

In [2]:
## Set up S3 bucket

bucket = "cornflex-upyourgame"
filename = 'interactions_data.csv'
userfilename = 'user_data.csv'

s3 = boto3.client('s3')
if boto3.resource('s3').Bucket(bucket).creation_date is None:
    s3.create_bucket(ACL = "private", Bucket = bucket)
    print("Creating bucket: {}".format(bucket))


In [4]:
## Generate user-interactions data and dump to S3

import random
import math

# Rationale: Based on event_value, which is correlated with interest level for each item (aka course),
# recommend the course (item_id) to the user 

data = {
    'USER_ID': np.arange(10000,20000),
    'AGE': [np.round(random.normalvariate(30,10),0) for i in range(10000)],
    'EDUCATION_LEVEL': [random.choice(["Secondary", "JC", "Poly", "University", "Postgrad"])  for i in range(10000)],
    'TIMESTAMP': [random.randint(1568143512, 1569093912) for i in range(10000)], # https://www.epochconverter.com/
    'ITEM_ID': [random.randint(1,11) for i in range(10000)], # 10 courses
    'EVENT_TYPE': [random.choice(["Click", "Hover", "Load", "Register", "Complete"])  for i in range(10000)],
    'EVENT_VALUE': [random.uniform(0.3,1) for i in range(10000)] # event_value indicates different types of event types and their interests
}

data = pd.DataFrame(data)
print(data.head())
interactions_data = data[data['EVENT_VALUE'] > 0.5] # only sieve those above 0.5 interaction values 
interactions_data = interactions_data[['USER_ID', 'ITEM_ID', 'TIMESTAMP', 'EVENT_TYPE', 'EVENT_VALUE']]
print('')
print(interactions_data.head())

user_data = data[['USER_ID', 'AGE', 'EDUCATION_LEVEL']]
print('')
print(user_data.head())

# save interactions 
interactions_data.to_csv(filename, index=False)
boto3.Session().resource('s3').Bucket(bucket).Object(filename).upload_file(filename)

# save user data
user_data.to_csv(userfilename, index=False)
boto3.Session().resource('s3').Bucket(bucket).Object(userfilename).upload_file(userfilename)

   USER_ID   AGE EDUCATION_LEVEL   TIMESTAMP  ITEM_ID EVENT_TYPE  EVENT_VALUE
0    10000  22.0       Secondary  1568601959        9      Hover     0.891973
1    10001  30.0       Secondary  1569004851        9       Load     0.978170
2    10002  50.0       Secondary  1568202917        5      Hover     0.757932
3    10003  38.0       Secondary  1568426832       10   Complete     0.783655
4    10004  35.0        Postgrad  1568776795        6      Hover     0.538509

   USER_ID  ITEM_ID   TIMESTAMP EVENT_TYPE  EVENT_VALUE
0    10000        9  1568601959      Hover     0.891973
1    10001        9  1569004851       Load     0.978170
2    10002        5  1568202917      Hover     0.757932
3    10003       10  1568426832   Complete     0.783655
4    10004        6  1568776795      Hover     0.538509

   USER_ID   AGE EDUCATION_LEVEL
0    10000  22.0       Secondary
1    10001  30.0       Secondary
2    10002  50.0       Secondary
3    10003  38.0       Secondary
4    10004  35.0        Postg

In [10]:
interactions_schema = {
    "type": "record",
    "name": "Interactions",
    "namespace": "com.amazonaws.personalize.schema",
    "fields": [
        {
            "name": "USER_ID",
            "type": "string"
        },
        {
            "name": "ITEM_ID",
            "type": "string"
        },
        {
            "name": "TIMESTAMP",
            "type": "long"
        },
        {
            "name": "EVENT_TYPE",
            "type": "string"
        },
        {
            "name": "EVENT_VALUE",
            "type": "float"
        }
    ],
    "version": "1.0"
}

create_schema_response = personalize.create_schema(
    name = "interactions-schema-test",
    schema = json.dumps(interactions_schema)
)

interactions_schema_arn = create_schema_response['schemaArn']
print(json.dumps(create_schema_response, indent=2))

user_schema = {
    "type": "record",
    "name": "Users",
    "namespace": "com.amazonaws.personalize.schema",
    "fields": [
        {
            "name": "USER_ID",
            "type": "string"
        },
        {
            "name": "AGE",
            "type": "int"
        },
        {
            "name": "EDUCATION_LEVEL",
            "type": "string"
        }
    ],
    "version": "1.0"
}

create_schema_response = personalize.create_schema(
    name = "user-schema-test",
    schema = json.dumps(user_schema)
)

users_schema_arn = create_schema_response['schemaArn']
print(json.dumps(create_schema_response, indent=2))


{
  "schemaArn": "arn:aws:personalize:us-east-1:545724157525:schema/interactions-schema-test",
  "ResponseMetadata": {
    "RequestId": "178c33cc-2272-4d4a-ae42-8f075cfbc5b3",
    "HTTPStatusCode": 200,
    "HTTPHeaders": {
      "content-type": "application/x-amz-json-1.1",
      "date": "Sat, 21 Sep 2019 22:06:02 GMT",
      "x-amzn-requestid": "178c33cc-2272-4d4a-ae42-8f075cfbc5b3",
      "content-length": "90",
      "connection": "keep-alive"
    },
    "RetryAttempts": 0
  }
}
{
  "schemaArn": "arn:aws:personalize:us-east-1:545724157525:schema/user-schema-test",
  "ResponseMetadata": {
    "RequestId": "86c6954e-c004-4ecd-9fb8-10ede09df24e",
    "HTTPStatusCode": 200,
    "HTTPHeaders": {
      "content-type": "application/x-amz-json-1.1",
      "date": "Sat, 21 Sep 2019 22:06:02 GMT",
      "x-amzn-requestid": "86c6954e-c004-4ecd-9fb8-10ede09df24e",
      "content-length": "82",
      "connection": "keep-alive"
    },
    "RetryAttempts": 0
  }
}


In [7]:
## Create and wait for dataset group 

create_dataset_group_response = personalize.create_dataset_group(
    name = "personalize-recommender-datasetgroup"
)

dataset_group_arn = create_dataset_group_response['datasetGroupArn']
print(json.dumps(create_dataset_group_response, indent=2))


# Wait for ACTIVE STATUS

status = None
max_time = time.time() + 3*60*60 # 3 hours
while time.time() < max_time:
    describe_dataset_group_response = personalize.describe_dataset_group(
        datasetGroupArn = dataset_group_arn
    )
    status = describe_dataset_group_response["datasetGroup"]["status"]
    print("DatasetGroup: {}".format(status))
    
    if status == "ACTIVE" or status == "CREATE FAILED":
        break
        
    time.sleep(15)

{
  "datasetGroupArn": "arn:aws:personalize:us-east-1:545724157525:dataset-group/personalize-recommender-datasetgroup",
  "ResponseMetadata": {
    "RequestId": "993c92d5-cefa-4bfa-a415-1ea6856d82cb",
    "HTTPStatusCode": 200,
    "HTTPHeaders": {
      "content-type": "application/x-amz-json-1.1",
      "date": "Sat, 21 Sep 2019 22:01:55 GMT",
      "x-amzn-requestid": "993c92d5-cefa-4bfa-a415-1ea6856d82cb",
      "content-length": "115",
      "connection": "keep-alive"
    },
    "RetryAttempts": 0
  }
}
DatasetGroup: CREATE PENDING
DatasetGroup: ACTIVE


In [11]:
# Now create dataset 

dataset_type = "INTERACTIONS"
create_dataset_response = personalize.create_dataset(
    datasetType = dataset_type,
    datasetGroupArn = dataset_group_arn,
    schemaArn = interactions_schema_arn,
    name="personalize-recommender-test-interactions"
)

dataset_arn = create_dataset_response['datasetArn']
print(json.dumps(create_dataset_response, indent=2))


dataset_type = "USERS"
create_dataset_response = personalize.create_dataset(
    name = "personalize-recommender-test-users",
    datasetType = dataset_type,
    datasetGroupArn = dataset_group_arn,
    schemaArn = users_schema_arn)

# Get the ARN
dataset_arn_u = create_dataset_response['datasetArn']
print(dataset_arn_u)

{
  "datasetArn": "arn:aws:personalize:us-east-1:545724157525:dataset/personalize-recommender-datasetgroup/INTERACTIONS",
  "ResponseMetadata": {
    "RequestId": "b1208ae2-dc95-4ac6-aefc-7f065daa3267",
    "HTTPStatusCode": 200,
    "HTTPHeaders": {
      "content-type": "application/x-amz-json-1.1",
      "date": "Sat, 21 Sep 2019 22:06:42 GMT",
      "x-amzn-requestid": "b1208ae2-dc95-4ac6-aefc-7f065daa3267",
      "content-length": "117",
      "connection": "keep-alive"
    },
    "RetryAttempts": 0
  }
}
arn:aws:personalize:us-east-1:545724157525:dataset/personalize-recommender-datasetgroup/USERS


In [12]:
# Prepare, create, and wait for dataset import job

s3 = boto3.client("s3")

policy = {
    "Version": "2012-10-17",
    "Id": "PersonalizeS3BucketAccessPolicy",
    "Statement": [
        {
            "Sid": "PersonalizeS3BucketAccessPolicy",
            "Effect": "Allow",
            "Principal": {
                "Service": "personalize.amazonaws.com"
            },
            "Action": [
                "s3:GetObject",
                "s3:ListBucket"
            ],
            "Resource": [
                "arn:aws:s3:::{}".format(bucket),
                "arn:aws:s3:::{}/*".format(bucket)
            ]
        }
    ]
}

s3.put_bucket_policy(Bucket=bucket, Policy=json.dumps(policy))

{'ResponseMetadata': {'RequestId': 'AA8EAA8EE149F200',
  'HostId': 'dnjxGzU8Ui3N90CGUBKqkx1pm+p+ZaNlmre50/nOU1G09RrOnaexCrBvKEN51aT6tBKpaJtHFVg=',
  'HTTPStatusCode': 204,
  'HTTPHeaders': {'x-amz-id-2': 'dnjxGzU8Ui3N90CGUBKqkx1pm+p+ZaNlmre50/nOU1G09RrOnaexCrBvKEN51aT6tBKpaJtHFVg=',
   'x-amz-request-id': 'AA8EAA8EE149F200',
   'date': 'Sat, 21 Sep 2019 22:07:02 GMT',
   'server': 'AmazonS3'},
  'RetryAttempts': 0}}

In [13]:
# Create a personalize role 
iam = boto3.client("iam")

role_name = "PersonalizeRoleTest"
role_description = "Allows Amazon Personalize to call AWS services on your behalf."

assume_role_policy_doc = {
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "",
      "Effect": "Allow",
      "Principal": {
        "Service": "personalize.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    }
  ]
}

response = iam.create_role(RoleName=role_name, Description=role_description, AssumeRolePolicyDocument=json.dumps(assume_role_policy_doc))
role_arn = response["Role"]["Arn"]
role_arn

EntityAlreadyExistsException: An error occurred (EntityAlreadyExists) when calling the CreateRole operation: Role with name PersonalizeRoleTest already exists.

In [14]:
persoanlize_role_arn = "arn:aws:iam::aws:policy/service-role/AmazonPersonalizeFullAccess"

bucket_policy = {
    "Version": "2012-10-17",
    "Id": "PersonalizeS3BucketAccessPolicy",
    "Statement": [
        {
            "Sid": "PersonalizeS3BucketAccessPolicy",
            "Effect": "Allow",
            "Action": [
                "s3:GetObject",
                "s3:ListBucket"
            ],
            "Resource": [
                "arn:aws:s3:::{}".format(bucket),
                "arn:aws:s3:::{}/*".format(bucket)
            ]
        }
    ]
}
iam.attach_role_policy(RoleName=role_name, PolicyArn=persoanlize_role_arn)
iam.put_role_policy(RoleName=role_name, PolicyName="bucket_permission", PolicyDocument=json.dumps(bucket_policy))

{'ResponseMetadata': {'RequestId': '30931abc-dcbc-11e9-b8b3-c1ec48176a52',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '30931abc-dcbc-11e9-b8b3-c1ec48176a52',
   'content-type': 'text/xml',
   'content-length': '206',
   'date': 'Sat, 21 Sep 2019 22:07:23 GMT'},
  'RetryAttempts': 0}}

In [17]:
create_dataset_import_job_response = personalize.create_dataset_import_job(
    jobName = "interactions-data-import-job",
    datasetArn = dataset_arn,
    dataSource = {
        "dataLocation": "s3://{}/{}".format(bucket, filename)
    },
    roleArn = 'arn:aws:iam::545724157525:role/PersonalizeRoleTest'
)

dataset_import_job_arn = create_dataset_import_job_response['datasetImportJobArn']
print(json.dumps(create_dataset_import_job_response, indent=2))



{
  "datasetImportJobArn": "arn:aws:personalize:us-east-1:545724157525:dataset-import-job/interactions-data-import-job",
  "ResponseMetadata": {
    "RequestId": "59606c6d-4c8b-49b6-87c6-679a9d7a7bad",
    "HTTPStatusCode": 200,
    "HTTPHeaders": {
      "content-type": "application/x-amz-json-1.1",
      "date": "Sat, 21 Sep 2019 22:12:02 GMT",
      "x-amzn-requestid": "59606c6d-4c8b-49b6-87c6-679a9d7a7bad",
      "content-length": "116",
      "connection": "keep-alive"
    },
    "RetryAttempts": 0
  }
}


In [18]:
# Finally, wait for dataset import job to have ACTIVE STATUS

status = None
max_time = time.time() + 3*60*60 # 3 hours
while time.time() < max_time:
    describe_dataset_import_job_response = personalize.describe_dataset_import_job(
        datasetImportJobArn = dataset_import_job_arn
    )
    
    dataset_import_job = describe_dataset_import_job_response["datasetImportJob"]
    if "latestDatasetImportJobRun" not in dataset_import_job:
        status = dataset_import_job["status"]
        print("DatasetImportJob: {}".format(status))
    else:
        status = dataset_import_job["latestDatasetImportJobRun"]["status"]
        print("LatestDatasetImportJobRun: {}".format(status))
    
    if status == "ACTIVE" or status == "CREATE FAILED":
        break
        
    time.sleep(60)
    


DatasetImportJob: CREATE PENDING
DatasetImportJob: CREATE IN_PROGRESS
DatasetImportJob: CREATE IN_PROGRESS
DatasetImportJob: CREATE IN_PROGRESS
DatasetImportJob: CREATE IN_PROGRESS
DatasetImportJob: CREATE IN_PROGRESS
DatasetImportJob: CREATE IN_PROGRESS
DatasetImportJob: CREATE IN_PROGRESS
DatasetImportJob: CREATE IN_PROGRESS
DatasetImportJob: CREATE IN_PROGRESS
DatasetImportJob: CREATE IN_PROGRESS
DatasetImportJob: CREATE IN_PROGRESS
DatasetImportJob: CREATE IN_PROGRESS
DatasetImportJob: ACTIVE


In [19]:
# Repeat for user data 

create_dataset_import_job_response = personalize.create_dataset_import_job(
    jobName = "users-data-import-job",
    datasetArn = dataset_arn_u,
    dataSource = {
        "dataLocation": "s3://{}/{}".format(bucket, userfilename)
    },
    roleArn = 'arn:aws:iam::545724157525:role/PersonalizeRoleTest'
)

dataset_import_job_arn_u = create_dataset_import_job_response['datasetImportJobArn']
print(json.dumps(create_dataset_import_job_response, indent=2))

# Finally, wait for dataset import job to have ACTIVE STATUS

status = None
max_time = time.time() + 3*60*60 # 3 hours
while time.time() < max_time:
    describe_dataset_import_job_response = personalize.describe_dataset_import_job(
        datasetImportJobArn = dataset_import_job_arn_u
    )
    
    dataset_import_job = describe_dataset_import_job_response["datasetImportJob"]
    if "latestDatasetImportJobRun" not in dataset_import_job:
        status = dataset_import_job["status"]
        print("DatasetImportJob: {}".format(status))
    else:
        status = dataset_import_job["latestDatasetImportJobRun"]["status"]
        print("LatestDatasetImportJobRun: {}".format(status))
    
    if status == "ACTIVE" or status == "CREATE FAILED":
        break
        
    time.sleep(60)

InvalidInputException: An error occurred (InvalidInputException) when calling the CreateDatasetImportJob operation: Input csv has rows that do not conform to the dataset schema. Please ensure all required data fields are present and that they are of the type specified in the schema.

In [20]:
# SELECT PERSONALIZE RECIPE!

recipe_list = [
    "arn:aws:personalize:::recipe/aws-hrnn",
    "arn:aws:personalize:::recipe/aws-hrnn-coldstart",
    "arn:aws:personalize:::recipe/aws-hrnn-metadata",
    "arn:aws:personalize:::recipe/aws-personalized-ranking",
    "arn:aws:personalize:::recipe/aws-popularity-count",
    "arn:aws:personalize:::recipe/aws-sims"
]

recipe_arn = recipe_list[0] # selecting hrnn only for now 
print(recipe_arn)

arn:aws:personalize:::recipe/aws-hrnn


In [21]:
# Create solution

create_solution_response = personalize.create_solution(
    name = "personalize-interactions-solution",
    datasetGroupArn = dataset_group_arn,
    recipeArn = recipe_arn # this can be removed and use performAutoML = True as well 
)

solution_arn = create_solution_response['solutionArn']
print(json.dumps(create_solution_response, indent=2))

# Create solution version 
create_solution_version_response = personalize.create_solution_version(
    solutionArn = solution_arn
)

solution_version_arn = create_solution_version_response['solutionVersionArn']
print(json.dumps(create_solution_version_response, indent=2))

{
  "solutionArn": "arn:aws:personalize:us-east-1:545724157525:solution/personalize-interactions-solution",
  "ResponseMetadata": {
    "RequestId": "fe58bf34-7a29-4e8e-81e2-17da4108a8d0",
    "HTTPStatusCode": 200,
    "HTTPHeaders": {
      "content-type": "application/x-amz-json-1.1",
      "date": "Sun, 22 Sep 2019 02:09:50 GMT",
      "x-amzn-requestid": "fe58bf34-7a29-4e8e-81e2-17da4108a8d0",
      "content-length": "103",
      "connection": "keep-alive"
    },
    "RetryAttempts": 0
  }
}


InvalidInputException: An error occurred (InvalidInputException) when calling the CreateSolutionVersion operation: The following dataset does not have any successfully completed dataset import jobs: [arn:aws:personalize:us-east-1:545724157525:dataset/personalize-recommender-datasetgroup/USERS]

In [None]:
# And wait for the solution to have ACTIVE STATUS

status = None
max_time = time.time() + 3*60*60 # 3 hours
while time.time() < max_time:
    describe_solution_version_response = personalize.describe_solution_version(
        solutionVersionArn = solution_version_arn
    )
    status = describe_solution_version_response["solutionVersion"]["status"]
    print("SolutionVersion: {}".format(status))
    
    if status == "ACTIVE" or status == "CREATE FAILED":
        break
        
    time.sleep(60)
    

In [None]:
# Get metrics of solution

get_solution_metrics_response = personalize.get_solution_metrics(
    solutionVersionArn = solution_version_arn
)

print(json.dumps(get_solution_metrics_response, indent=2))

In [None]:
## CREATE AND WAIT for CAMPAIGN

create_campaign_response = personalize.create_campaign(
    name = "personalize-interactions-campaign",
    solutionVersionArn = solution_version_arn,
    minProvisionedTPS = 1
)

campaign_arn = create_campaign_response['campaignArn']
print(json.dumps(create_campaign_response, indent=2))

In [None]:
## Wait for campaign to have ACTIVE STATUS

status = None
max_time = time.time() + 3*60*60 # 3 hours
while time.time() < max_time:
    describe_campaign_response = personalize.describe_campaign(
        campaignArn = campaign_arn
    )
    status = describe_campaign_response["campaign"]["status"]
    print("Campaign: {}".format(status))
    
    if status == "ACTIVE" or status == "CREATE FAILED":
        break
        
    time.sleep(60)


In [None]:
# Get recommendations 

items = pd.read_csv('./u.item', sep='\t', usecols=[0,1], header=None)
items.columns = ['ITEM_ID', 'TITLE']

user_id, item_id, _ = data.sample().values[0]
item_title = items.loc[items['ITEM_ID'] == item_id].values[0][-1]
print("USER: {}".format(user_id))
print("ITEM: {}".format(item_title))

items

In [None]:
get_recommendations_response = personalize_runtime.get_recommendations(
    campaignArn = campaign_arn,
    userId = str(user_id)
)

item_list = get_recommendations_response['itemList']
title_list = [items.loc[items['ITEM_ID'] == np.int(item['itemId'])].values[0][-1] for item in item_list]

print("Recommendations: {}".format(json.dumps(title_list, indent=2)))