# Uploading To S3 and Personalize

In the previous notebook you converted the Netflix dataset into a collection of 2 CSV files.

The code below will complete the following:

1. Upload to S3
1. Connect policies for Personalize to the Bucket holding the data
1. Create dataset groups and datasets for Personalize
1. Import data for Personalize


Before getting started inside the console, create a bucket and update the line below with your bucketname

In [1]:
bucket = "chriskingnetflixdemo"

In [2]:
# Imports
import boto3

import json
import numpy as np
import pandas as pd
import time
import datetime
import csv

In [11]:
# define files
item_metadata = "items.csv"
interactions = "interactions.csv"

In [12]:
# Configure the SDK to Personalize:
personalize = boto3.client('personalize')
personalize_runtime = boto3.client('personalize-runtime')

## Upload files

In [6]:
# Item Metadata
boto3.Session().resource('s3').Bucket(bucket).Object(item_metadata).upload_file(item_metadata)

In [10]:
# Interaction Data
boto3.Session().resource('s3').Bucket(bucket).Object(interactions).upload_file(interactions)

## Create Schema

The next large step will be creating schemas for all 3 files and then placing them inside the Personalize service.

### Creating the Interaction Schema First

This is required to make Personalize function so we will start with the last data exported. If you get an error that the resource already exists, change the name variable below.

In [15]:
interactions_schema = {
    "type": "record",
    "name": "Interactions",
    "namespace": "com.amazonaws.personalize.schema",
    "fields": [
        {
            "name": "USER_ID",
            "type": "string"
        },
        {
            "name": "ITEM_ID",
            "type": "string"
        },
        {
            "name": "TIMESTAMP",
            "type": "long"
        }
    ],
    "version": "1.0"
}

create_schema_response = personalize.create_schema(
    name = "netflix-interactions",
    schema = json.dumps(interactions_schema)
)

schema_arn = create_schema_response['schemaArn']
print(json.dumps(create_schema_response, indent=2))

{
  "schemaArn": "arn:aws:personalize:us-east-1:059124553121:schema/netflix-interactions",
  "ResponseMetadata": {
    "RequestId": "3476ee2d-2087-41ff-ba5e-c5d53ca72f3e",
    "HTTPStatusCode": 200,
    "HTTPHeaders": {
      "content-type": "application/x-amz-json-1.1",
      "date": "Sun, 14 Jul 2019 18:15:28 GMT",
      "x-amzn-requestid": "3476ee2d-2087-41ff-ba5e-c5d53ca72f3e",
      "content-length": "86",
      "connection": "keep-alive"
    },
    "RetryAttempts": 0
  }
}


### Create and Wait for Dataset Group

The largest grouping in Personalize is a Dataset Group, this will isolate your data, event trackers, solutions, and campaigns. Grouping things together that share a common collection of data. Feel free to alter the name below if you'd like.

#### Create Dataset Group

In [16]:
create_dataset_group_response = personalize.create_dataset_group(
    name = "personalize-nf-demo"
)

dataset_group_arn = create_dataset_group_response['datasetGroupArn']
print(json.dumps(create_dataset_group_response, indent=2))

{
  "datasetGroupArn": "arn:aws:personalize:us-east-1:059124553121:dataset-group/personalize-nf-demo",
  "ResponseMetadata": {
    "RequestId": "57f42b16-29bc-4a1f-9090-b7f869af6861",
    "HTTPStatusCode": 200,
    "HTTPHeaders": {
      "content-type": "application/x-amz-json-1.1",
      "date": "Sun, 14 Jul 2019 18:16:28 GMT",
      "x-amzn-requestid": "57f42b16-29bc-4a1f-9090-b7f869af6861",
      "content-length": "98",
      "connection": "keep-alive"
    },
    "RetryAttempts": 0
  }
}


##### Wait for Dataset Group to Have ACTIVE Status

Before we can use the Dataset Group in any items below it must be active, execute the cell below and wait for it to show active.

In [17]:
max_time = time.time() + 3*60*60 # 3 hours
while time.time() < max_time:
    describe_dataset_group_response = personalize.describe_dataset_group(
        datasetGroupArn = dataset_group_arn
    )
    status = describe_dataset_group_response["datasetGroup"]["status"]
    print("DatasetGroup: {}".format(status))
    
    if status == "ACTIVE" or status == "CREATE FAILED":
        break
        
    time.sleep(60)

DatasetGroup: ACTIVE


### Create Dataset

After the group, the next thing to create is the actual datasets, in this example we will only create 1 for the interactions data. Execute the cells below to create it.



In [18]:
dataset_type = "INTERACTIONS"
create_dataset_response = personalize.create_dataset(
    name = "netflix-demo-interactions",
    datasetType = dataset_type,
    datasetGroupArn = dataset_group_arn,
    schemaArn = schema_arn
)

dataset_arn = create_dataset_response['datasetArn']
print(json.dumps(create_dataset_response, indent=2))

{
  "datasetArn": "arn:aws:personalize:us-east-1:059124553121:dataset/personalize-nf-demo/INTERACTIONS",
  "ResponseMetadata": {
    "RequestId": "534073b6-f0f9-4943-beae-ca95490ad18a",
    "HTTPStatusCode": 200,
    "HTTPHeaders": {
      "content-type": "application/x-amz-json-1.1",
      "date": "Sun, 14 Jul 2019 18:17:26 GMT",
      "x-amzn-requestid": "534073b6-f0f9-4943-beae-ca95490ad18a",
      "content-length": "100",
      "connection": "keep-alive"
    },
    "RetryAttempts": 0
  }
}


#### Attach Policy to S3 Bucket

Amazon Personalize needs to be able to read the content of your S3 bucket that you created earlier. The lines below will do that.


In [19]:
s3 = boto3.client("s3")

policy = {
    "Version": "2012-10-17",
    "Id": "PersonalizeS3BucketAccessPolicy",
    "Statement": [
        {
            "Sid": "PersonalizeS3BucketAccessPolicy",
            "Effect": "Allow",
            "Principal": {
                "Service": "personalize.amazonaws.com"
            },
            "Action": [
                "s3:GetObject",
                "s3:ListBucket"
            ],
            "Resource": [
                "arn:aws:s3:::{}".format(bucket),
                "arn:aws:s3:::{}/*".format(bucket)
            ]
        }
    ]
}

s3.put_bucket_policy(Bucket=bucket, Policy=json.dumps(policy))

{'ResponseMetadata': {'RequestId': '5E3D02B3D8A79677',
  'HostId': 'GXG/jAFfJQTr3zDOooPSi9LEoKF7XJVKGcSI4DvgDSh/XuMdZVU9WfTbmyfCDLgDv91bhde87W4=',
  'HTTPStatusCode': 204,
  'HTTPHeaders': {'x-amz-id-2': 'GXG/jAFfJQTr3zDOooPSi9LEoKF7XJVKGcSI4DvgDSh/XuMdZVU9WfTbmyfCDLgDv91bhde87W4=',
   'x-amz-request-id': '5E3D02B3D8A79677',
   'date': 'Sun, 14 Jul 2019 18:17:54 GMT',
   'server': 'AmazonS3'},
  'RetryAttempts': 0}}

### Create Personalize Role

Also Amazon Personalize needs the ability to assume Roles in AWS in order to have the permissions to execute certain tasks, the lines below grant that.

In [20]:
iam = boto3.client("iam")

role_name = "PersonalizeRoleDemoNF"
assume_role_policy_document = {
    "Version": "2012-10-17",
    "Statement": [
        {
          "Effect": "Allow",
          "Principal": {
            "Service": "personalize.amazonaws.com"
          },
          "Action": "sts:AssumeRole"
        }
    ]
}

create_role_response = iam.create_role(
    RoleName = role_name,
    AssumeRolePolicyDocument = json.dumps(assume_role_policy_document)
)

# AmazonPersonalizeFullAccess provides access to any S3 bucket with a name that includes "personalize" or "Personalize" 
# if you would like to use a bucket with a different name, please consider creating and attaching a new policy
# that provides read access to your bucket or attaching the AmazonS3ReadOnlyAccess policy to the role
policy_arn = "arn:aws:iam::aws:policy/service-role/AmazonPersonalizeFullAccess"
iam.attach_role_policy(
    RoleName = role_name,
    PolicyArn = policy_arn
)

time.sleep(60) # wait for a minute to allow IAM role policy attachment to propagate

role_arn = create_role_response["Role"]["Arn"]
print(role_arn)

arn:aws:iam::059124553121:role/PersonalizeRoleDemoNF


### Import the data

Earlier you created the DatasetGroup and Dataset to house your information, now you will execute an import job that will load the data from S3 into Amazon Personalize for usage building your model.

#### Create Dataset Import Job


In [25]:
create_dataset_import_job_response = personalize.create_dataset_import_job(
    jobName = "nf-demo-import1",
    datasetArn = dataset_arn,
    dataSource = {
        "dataLocation": "s3://{}/{}".format(bucket, interactions)
    },
    roleArn = role_arn
)

dataset_import_job_arn = create_dataset_import_job_response['datasetImportJobArn']
print(json.dumps(create_dataset_import_job_response, indent=2))

{
  "datasetImportJobArn": "arn:aws:personalize:us-east-1:059124553121:dataset-import-job/nf-demo-import1",
  "ResponseMetadata": {
    "RequestId": "2f1ee7ae-811c-4bc2-b539-b4b61099dd06",
    "HTTPStatusCode": 200,
    "HTTPHeaders": {
      "content-type": "application/x-amz-json-1.1",
      "date": "Sun, 14 Jul 2019 18:22:11 GMT",
      "x-amzn-requestid": "2f1ee7ae-811c-4bc2-b539-b4b61099dd06",
      "content-length": "103",
      "connection": "keep-alive"
    },
    "RetryAttempts": 0
  }
}


In [27]:
status = None
max_time = time.time() + 3*60*60 # 3 hours
while time.time() < max_time:
    describe_dataset_import_job_response = personalize.describe_dataset_import_job(
        datasetImportJobArn = dataset_import_job_arn
    )
    
    dataset_import_job = describe_dataset_import_job_response["datasetImportJob"]
    if "latestDatasetImportJobRun" not in dataset_import_job:
        status = dataset_import_job["status"]
        print("DatasetImportJob: {}".format(status))
    else:
        status = dataset_import_job["latestDatasetImportJobRun"]["status"]
        print("LatestDatasetImportJobRun: {}".format(status))
    
    if status == "ACTIVE" or status == "CREATE FAILED":
        break
        
    time.sleep(60)

DatasetImportJob: CREATE IN_PROGRESS
DatasetImportJob: CREATE IN_PROGRESS
DatasetImportJob: CREATE IN_PROGRESS
DatasetImportJob: CREATE IN_PROGRESS
DatasetImportJob: CREATE IN_PROGRESS
DatasetImportJob: CREATE IN_PROGRESS
DatasetImportJob: CREATE IN_PROGRESS
DatasetImportJob: CREATE IN_PROGRESS
DatasetImportJob: CREATE IN_PROGRESS
DatasetImportJob: CREATE IN_PROGRESS
DatasetImportJob: CREATE IN_PROGRESS
DatasetImportJob: CREATE IN_PROGRESS
DatasetImportJob: CREATE IN_PROGRESS
DatasetImportJob: ACTIVE
