In [1]:
USE_FULL_MOVIELENS = False
data_dir = "poc_data"
!mkdir $data_dir

if not USE_FULL_MOVIELENS:
    !cd $data_dir && wget http://files.grouplens.org/datasets/movielens/ml-latest-small.zip
    !cd $data_dir && unzip ml-latest-small.zip
    dataset_dir = data_dir + "/ml-latest-small/"
else:
    !cd $data_dir && wget http://files.grouplens.org/datasets/movielens/ml-25m.zip
    !cd $data_dir && unzip ml-25m.zip
    dataset_dir = data_dir + "/ml-25m/"

--2022-05-05 09:28:28--  http://files.grouplens.org/datasets/movielens/ml-latest-small.zip
Resolving files.grouplens.org (files.grouplens.org)... 128.101.65.152
Connecting to files.grouplens.org (files.grouplens.org)|128.101.65.152|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 978202 (955K) [application/zip]
Saving to: ‘ml-latest-small.zip’


2022-05-05 09:28:29 (1.33 MB/s) - ‘ml-latest-small.zip’ saved [978202/978202]

Archive:  ml-latest-small.zip
   creating: ml-latest-small/
  inflating: ml-latest-small/links.csv  
  inflating: ml-latest-small/tags.csv  
  inflating: ml-latest-small/ratings.csv  
  inflating: ml-latest-small/README.txt  
  inflating: ml-latest-small/movies.csv  


### Take a look at the data files you have downloaded.

In [2]:
import time
from time import sleep
import json
from datetime import datetime
import boto3
import pandas as pd

In [3]:
ratings_data = pd.read_csv(dataset_dir + '/ratings.csv')
ratings_data.head(5)

Unnamed: 0,userId,movieId,rating,timestamp
0,1,1,4.0,964982703
1,1,3,4.0,964981247
2,1,6,4.0,964982224
3,1,47,5.0,964983815
4,1,50,5.0,964982931


In [4]:
movies_data = pd.read_csv(dataset_dir + '/movies.csv')
movies_data.head(5)

Unnamed: 0,movieId,title,genres
0,1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
1,2,Jumanji (1995),Adventure|Children|Fantasy
2,3,Grumpier Old Men (1995),Comedy|Romance
3,4,Waiting to Exhale (1995),Comedy|Drama|Romance
4,5,Father of the Bride Part II (1995),Comedy


In [5]:
watched_df = ratings_data.copy()
watched_df = watched_df[watched_df['rating'] > 3]
watched_df = watched_df[['userId', 'movieId', 'timestamp']]
watched_df['EVENT_TYPE']='watch'
watched_df.head()

Unnamed: 0,userId,movieId,timestamp,EVENT_TYPE
0,1,1,964982703,watch
1,1,3,964981247,watch
2,1,6,964982224,watch
3,1,47,964983815,watch
4,1,50,964982931,watch


In [6]:
clicked_df = ratings_data.copy()
clicked_df = clicked_df[clicked_df['rating'] > 1]
clicked_df = clicked_df[['userId', 'movieId', 'timestamp']]
clicked_df['EVENT_TYPE']='click'
clicked_df.head() 

Unnamed: 0,userId,movieId,timestamp,EVENT_TYPE
0,1,1,964982703,click
1,1,3,964981247,click
2,1,6,964982224,click
3,1,47,964983815,click
4,1,50,964982931,click


### Create a new interaction dataset with event_type : click or/and watch

In [7]:
interactions_df = clicked_df.copy()
interactions_df = interactions_df.append(watched_df)
interactions_df.sort_values("timestamp", axis = 0, ascending = True, 
                 inplace = True, na_position ='last') 

In [11]:
interactions_df.rename(columns = {'userId':'USER_ID', 'movieId':'ITEM_ID', 
                              'timestamp':'TIMESTAMP'}, inplace = True) 

In [12]:
interactions_filename = "interactions.csv"
interactions_df.to_csv((data_dir+"/"+interactions_filename), index=False, float_format='%.0f')

In [13]:
personalize = boto3.client('personalize')
personalize_runtime = boto3.client('personalize-runtime')
personalize_events = boto3.client('personalize-events')

servicediscovery = boto3.client('servicediscovery')
ssm = boto3.client('ssm')

In [17]:
create_dataset_group_response = personalize.create_dataset_group(
    name = "demo-movies-personalize"
)

dataset_group_arn = create_dataset_group_response['datasetGroupArn']
print(json.dumps(create_dataset_group_response, indent=2))

{
  "datasetGroupArn": "arn:aws:personalize:eu-west-1:040815733699:dataset-group/demo-movies-personalize",
  "ResponseMetadata": {
    "RequestId": "4ca2b774-8a66-4849-ab3e-eb987295b8c1",
    "HTTPStatusCode": 200,
    "HTTPHeaders": {
      "date": "Thu, 05 May 2022 09:32:14 GMT",
      "content-type": "application/x-amz-json-1.1",
      "content-length": "102",
      "connection": "keep-alive",
      "x-amzn-requestid": "4ca2b774-8a66-4849-ab3e-eb987295b8c1"
    },
    "RetryAttempts": 0
  }
}


In [20]:
max_time = time.time() + 3*60*60 # 3 hours
while time.time() < max_time:
    describe_dataset_group_response = personalize.describe_dataset_group(
        datasetGroupArn = dataset_group_arn
    )
    status = describe_dataset_group_response["datasetGroup"]["status"]
    print("Checking the dataset status: {}".format(status))
    
    if status == "ACTIVE" or status == "CREATE FAILED":
        break
        
    time.sleep(60)

Checking the dataset status: ACTIVE


In [21]:
interactions_schema = schema = {
    "type": "record",
    "name": "Interactions",
    "namespace": "com.amazonaws.personalize.schema",
    "fields": [
        {
            "name": "USER_ID",
            "type": "string"
        },
        {
            "name": "ITEM_ID",
            "type": "string"
        },
        {
            "name": "TIMESTAMP",
            "type": "long"
        },
        {
            "name": "EVENT_TYPE",
            "type": "string"
        }
    ],
    "version": "1.0"
}

create_schema_response = personalize.create_schema(
    name = "demo-personalize-movies-interactions",
    schema = json.dumps(interactions_schema)
)

interaction_schema_arn = create_schema_response['schemaArn']
print("schema created : USER_ID, ITEM_ID, TIMESTAMP, EVENT_TYPE")

schema created : USER_ID, ITEM_ID, TIMESTAMP, EVENT_TYPE


In [22]:
dataset_type = "INTERACTIONS"
create_dataset_response = personalize.create_dataset(
    name = "demo-personalize-movies-ints",
    datasetType = dataset_type,
    datasetGroupArn = dataset_group_arn,
    schemaArn = interaction_schema_arn
)

interactions_dataset_arn = create_dataset_response['datasetArn']
print(json.dumps(create_dataset_response, indent=2))

{
  "datasetArn": "arn:aws:personalize:eu-west-1:040815733699:dataset/demo-movies-personalize/INTERACTIONS",
  "ResponseMetadata": {
    "RequestId": "fb8b5136-3fcb-4b58-bc04-79db5d763135",
    "HTTPStatusCode": 200,
    "HTTPHeaders": {
      "date": "Thu, 05 May 2022 09:40:23 GMT",
      "content-type": "application/x-amz-json-1.1",
      "content-length": "104",
      "connection": "keep-alive",
      "x-amzn-requestid": "fb8b5136-3fcb-4b58-bc04-79db5d763135"
    },
    "RetryAttempts": 0
  }
}


In [23]:
with open('/opt/ml/metadata/resource-metadata.json') as notebook_info:
    data = json.load(notebook_info)
    resource_arn = data['ResourceArn']
    region = resource_arn.split(':')[3]
print("let's create a bucket in", region, "to store ou csv files")

let's create a bucket in eu-west-1 to store ou csv files


In [24]:
s3 = boto3.client('s3')
account_id = boto3.client('sts').get_caller_identity().get('Account')
bucket_name = account_id + "-" + region + "-" + "demo-personalize"
print(bucket_name)
if region == "us-east-1":
    s3.create_bucket(Bucket=bucket_name)
else:
    s3.create_bucket(
        Bucket=bucket_name,
        CreateBucketConfiguration={'LocationConstraint': region}
        )

040815733699-eu-west-1-demo-personalize


In [25]:
interactions_file_path = data_dir + "/" + interactions_filename
boto3.Session().resource('s3').Bucket(bucket_name).Object(interactions_filename).upload_file(interactions_file_path)
interactions_s3DataPath = "s3://"+bucket_name+"/"+interactions_filename
print("uploading our data")

uploading our data


## Uploading our data

In [32]:
policy = {
    "Version": "2012-10-17",
    "Id": "PersonalizeS3BucketAccessPolicy",
    "Statement": [
        {
            "Sid": "PersonalizeS3BucketAccessPolicy",
            "Effect": "Allow",
            "Principal": {
                "Service": "personalize.amazonaws.com"
            },
            "Action": [
                "s3:*Object",
                "s3:ListBucket"
            ],
            "Resource": [
                "arn:aws:s3:::{}".format(bucket_name),
                "arn:aws:s3:::{}/*".format(bucket_name)
            ]
        }
    ]
}

s3.put_bucket_policy(Bucket=bucket_name, Policy=json.dumps(policy))

{'ResponseMetadata': {'RequestId': '6A53E186AH9451YZ',
  'HostId': 'mpv8EU8vcl5crOMFaSRPeR5elohbZYMkt9Os20GfnItRxvhuMYFHs8LD2U4prIDVGiRvLDSRyPA=',
  'HTTPStatusCode': 204,
  'HTTPHeaders': {'x-amz-id-2': 'mpv8EU8vcl5crOMFaSRPeR5elohbZYMkt9Os20GfnItRxvhuMYFHs8LD2U4prIDVGiRvLDSRyPA=',
   'x-amz-request-id': '6A53E186AH9451YZ',
   'date': 'Thu, 05 May 2022 10:07:15 GMT',
   'server': 'AmazonS3'},
  'RetryAttempts': 0}}

In [33]:
iam = boto3.client("iam")

role_name = "PersonalizeRolePOC"
assume_role_policy_document = {
    "Version": "2012-10-17",
    "Statement": [
        {
          "Effect": "Allow",
          "Principal": {
            "Service": "personalize.amazonaws.com"
          },
          "Action": "sts:AssumeRole"
        }
    ]
}

create_role_response = iam.create_role(
    RoleName = role_name,
    AssumeRolePolicyDocument = json.dumps(assume_role_policy_document)
)

# AmazonPersonalizeFullAccess provides access to any S3 bucket with a name that includes "personalize" or "Personalize" 
# if you would like to use a bucket with a different name, please consider creating and attaching a new policy
# that provides read access to your bucket or attaching the AmazonS3ReadOnlyAccess policy to the role
policy_arn = "arn:aws:iam::aws:policy/service-role/AmazonPersonalizeFullAccess"
iam.attach_role_policy(
    RoleName = role_name,
    PolicyArn = policy_arn
)

# Now add S3 support
iam.attach_role_policy(
    PolicyArn='arn:aws:iam::aws:policy/AmazonS3FullAccess',
    RoleName=role_name
)
time.sleep(60) # wait for a minute to allow IAM role policy attachment to propagate

role_arn = create_role_response["Role"]["Arn"]
print(role_arn)

arn:aws:iam::040815733699:role/PersonalizeRolePOC


In [34]:
create_dataset_import_job_response = personalize.create_dataset_import_job(
    jobName = "demo-personalize-import-interactions",
    datasetArn = interactions_dataset_arn,
    dataSource = {
        "dataLocation": "s3://{}/{}".format(bucket_name, interactions_filename)
    },
    roleArn = role_arn
)

dataset_import_job_arn = create_dataset_import_job_response['datasetImportJobArn']
print(json.dumps(create_dataset_import_job_response, indent=2))

{
  "datasetImportJobArn": "arn:aws:personalize:eu-west-1:040815733699:dataset-import-job/demo-personalize-import-interactions",
  "ResponseMetadata": {
    "RequestId": "e8a87d0e-dbc1-4bcb-acbc-e050df7abdbb",
    "HTTPStatusCode": 200,
    "HTTPHeaders": {
      "date": "Thu, 05 May 2022 10:08:27 GMT",
      "content-type": "application/x-amz-json-1.1",
      "content-length": "124",
      "connection": "keep-alive",
      "x-amzn-requestid": "e8a87d0e-dbc1-4bcb-acbc-e050df7abdbb"
    },
    "RetryAttempts": 0
  }
}


In [35]:
%%time

max_time = time.time() + 6*60*60 # 6 hours
while time.time() < max_time:
    describe_dataset_import_job_response = personalize.describe_dataset_import_job(
        datasetImportJobArn = dataset_import_job_arn
    )
    status = describe_dataset_import_job_response["datasetImportJob"]['status']
    print("DatasetImportJob: {}".format(status))
    
    if status == "ACTIVE" or status == "CREATE FAILED":
        break
        
    time.sleep(60)

DatasetImportJob: CREATE PENDING
DatasetImportJob: CREATE IN_PROGRESS
DatasetImportJob: CREATE IN_PROGRESS
DatasetImportJob: CREATE IN_PROGRESS
DatasetImportJob: ACTIVE
CPU times: user 67.1 ms, sys: 1.34 ms, total: 68.4 ms
Wall time: 4min


In [36]:
original_data = pd.read_csv(dataset_dir + '/movies.csv')
original_data.head(5)

Unnamed: 0,movieId,title,genres
0,1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
1,2,Jumanji (1995),Adventure|Children|Fantasy
2,3,Grumpier Old Men (1995),Comedy|Romance
3,4,Waiting to Exhale (1995),Comedy|Drama|Romance
4,5,Father of the Bride Part II (1995),Comedy


In [37]:
original_data['year'] =original_data['title'].str.extract('.*\((.*)\).*',expand = False)
original_data.head(5)

Unnamed: 0,movieId,title,genres,year
0,1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy,1995
1,2,Jumanji (1995),Adventure|Children|Fantasy,1995
2,3,Grumpier Old Men (1995),Comedy|Romance,1995
3,4,Waiting to Exhale (1995),Comedy|Drama|Romance,1995
4,5,Father of the Bride Part II (1995),Comedy,1995


In [38]:
original_data = original_data.dropna(axis=0)

In [39]:
itemmetadata_df = original_data.copy()
itemmetadata_df = itemmetadata_df[['movieId', 'genres', 'year']]
itemmetadata_df.head()

Unnamed: 0,movieId,genres,year
0,1,Adventure|Animation|Children|Comedy|Fantasy,1995
1,2,Adventure|Children|Fantasy,1995
2,3,Comedy|Romance,1995
3,4,Comedy|Drama|Romance,1995
4,5,Comedy,1995


In [40]:
itemmetadata_df['CREATION_TIMESTAMP'] = 0

In [41]:
itemmetadata_df.rename(columns = {'genres':'GENRE', 'movieId':'ITEM_ID', 'year':'YEAR'}, inplace = True) 

In [42]:
itemmetadata_filename = "item-meta.csv"
itemmetadata_df.to_csv((data_dir+"/"+itemmetadata_filename), index=False, float_format='%.0f')

### Creating Items Dataset and Schema

In [43]:
itemmetadata_schema = {
    "type": "record",
    "name": "Items",
    "namespace": "com.amazonaws.personalize.schema",
    "fields": [
        {
            "name": "ITEM_ID",
            "type": "string"
        },
        {
            "name": "GENRE",
            "type": "string",
            "categorical": True
        },{
            "name": "YEAR",
            "type": "int",
        },
        {
            "name": "CREATION_TIMESTAMP",
            "type": "long",
        }
    ],
    "version": "1.0"
}

create_schema_response = personalize.create_schema(
    name = "demo-personalize-movies-item",
    schema = json.dumps(itemmetadata_schema)
)

itemmetadataschema_arn = create_schema_response['schemaArn']
print(json.dumps(create_schema_response, indent=2))

{
  "schemaArn": "arn:aws:personalize:eu-west-1:040815733699:schema/demo-personalize-movies-item",
  "ResponseMetadata": {
    "RequestId": "e5d856cc-e44e-49d8-b3b6-0c4a76282917",
    "HTTPStatusCode": 200,
    "HTTPHeaders": {
      "date": "Thu, 05 May 2022 10:13:34 GMT",
      "content-type": "application/x-amz-json-1.1",
      "content-length": "94",
      "connection": "keep-alive",
      "x-amzn-requestid": "e5d856cc-e44e-49d8-b3b6-0c4a76282917"
    },
    "RetryAttempts": 0
  }
}


In [44]:
dataset_type = "ITEMS"
create_dataset_response = personalize.create_dataset(
    name = "demo-personalize-movies-items",
    datasetType = dataset_type,
    datasetGroupArn = dataset_group_arn,
    schemaArn = itemmetadataschema_arn
)

items_dataset_arn = create_dataset_response['datasetArn']
print(json.dumps(create_dataset_response, indent=2))

{
  "datasetArn": "arn:aws:personalize:eu-west-1:040815733699:dataset/demo-movies-personalize/ITEMS",
  "ResponseMetadata": {
    "RequestId": "dd3df582-1288-4b47-bf3b-3a8095e89e6c",
    "HTTPStatusCode": 200,
    "HTTPHeaders": {
      "date": "Thu, 05 May 2022 10:13:35 GMT",
      "content-type": "application/x-amz-json-1.1",
      "content-length": "97",
      "connection": "keep-alive",
      "x-amzn-requestid": "dd3df582-1288-4b47-bf3b-3a8095e89e6c"
    },
    "RetryAttempts": 0
  }
}


In [45]:
itemmetadata_file_path = data_dir + "/" + itemmetadata_filename
boto3.Session().resource('s3').Bucket(bucket_name).Object(itemmetadata_filename).upload_file(itemmetadata_file_path)
interactions_s3DataPath = "s3://"+bucket_name+"/"+itemmetadata_filename

In [47]:
create_dataset_import_job_response = personalize.create_dataset_import_job(
    jobName = "personalize-poc-item-import1",
    datasetArn = items_dataset_arn,
    dataSource = {
        "dataLocation": "s3://{}/{}".format(bucket_name, itemmetadata_filename)
    },
    roleArn = role_arn
)

dataset_import_job_arn = create_dataset_import_job_response['datasetImportJobArn']
print(json.dumps(create_dataset_import_job_response, indent=2))

{
  "datasetImportJobArn": "arn:aws:personalize:eu-west-1:040815733699:dataset-import-job/personalize-poc-item-import1",
  "ResponseMetadata": {
    "RequestId": "57322b58-12a9-4cf1-8609-29b1e7662c7a",
    "HTTPStatusCode": 200,
    "HTTPHeaders": {
      "date": "Thu, 05 May 2022 10:15:11 GMT",
      "content-type": "application/x-amz-json-1.1",
      "content-length": "116",
      "connection": "keep-alive",
      "x-amzn-requestid": "57322b58-12a9-4cf1-8609-29b1e7662c7a"
    },
    "RetryAttempts": 0
  }
}


In [49]:
%%time

max_time = time.time() + 6*60*60 # 6 hours
while time.time() < max_time:
    describe_dataset_import_job_response = personalize.describe_dataset_import_job(
        datasetImportJobArn = dataset_import_job_arn
    )
    status = describe_dataset_import_job_response["datasetImportJob"]['status']
    print("DatasetImportJob: {}".format(status))
    
    if status == "ACTIVE" or status == "CREATE FAILED":
        break
        
    time.sleep(60)

DatasetImportJob: ACTIVE
CPU times: user 17.2 ms, sys: 399 µs, total: 17.6 ms
Wall time: 63.8 ms


In [50]:
%store USE_FULL_MOVIELENS
%store dataset_dir
%store interactions_dataset_arn
%store dataset_group_arn
%store bucket_name
%store role_arn
%store role_name
%store data_dir
%store region
%store interaction_schema_arn
%store items_dataset_arn
%store itemmetadataschema_arn

Stored 'USE_FULL_MOVIELENS' (bool)
Stored 'dataset_dir' (str)
Stored 'interactions_dataset_arn' (str)
Stored 'dataset_group_arn' (str)
Stored 'bucket_name' (str)
Stored 'role_arn' (str)
Stored 'role_name' (str)
Stored 'data_dir' (str)
Stored 'region' (str)
Stored 'interaction_schema_arn' (str)
Stored 'items_dataset_arn' (str)
Stored 'itemmetadataschema_arn' (str)
