# 1. Preparing a sample product dataset from Amazon Reviews

This notebook brings you through the setup of product catalog data using the Amazon Reviews dataset available [here](https://amazon-reviews-2023.github.io/). The exact dataset used is the fashion metadata dataset from 2018 to select a sufficiently small dataset size for this example. 

In this notebook, we pre-process the metadata to get rid of undesirable inputs (e.g. entire HTML pages in text fields, get rid of duplicates and rows with blank fields) before inserting data into a product database in RDS (Aurora PostgreSQL) using the RDS Data API.

The [RDS Data API](https://docs.aws.amazon.com/rdsdataservice/latest/APIReference/Welcome.html) allows us to interact with RDS using HTTP API calls and relies on IAM permissions and secrets in [AWS Secrets Manager](https://aws.amazon.com/secrets-manager/) instead of using a PostgreSQL client to make connections using database credentials. This is especially helpful if your SageMaker domain and/or RDS cluster (e.g. serverless) is not bounded to a VPC. See more details and prerequisites [here](https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/data-api.html).

Before beginning the lab, [create a RDS Aurora PostgreSQL database cluster](https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/Aurora.CreateInstance.html) or select an existing database, taking into account prerequisites described in the [guide for using it as a vector database](https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/AuroraPostgreSQL.VectorDB.html). If you create a new database cluster, make sure to create an **initial database** by specifying an initial database name (e.g. postgres) under 'Additional Configuration'. If you are using an existing cluster, it is recommended for you to create a new database just for this lab. Also, follow these [instructions to create a database secret](https://docs.aws.amazon.com/secretsmanager/latest/userguide/create_database_secret.html) for your database credentials.

Check that your SageMaker domain IAM role has a custom inline policy like the [one provided](sagemakerpolicy.json) and `AmazonBedrockFullAccess`. Make sure that you have also configured the required settings for using Claude 3 models, Cohere embedding model(s) and/or Titan embedding model in Amazon Bedrock [Model Access](https://docs.aws.amazon.com/bedrock/latest/userguide/model-access.html). Both of the provided IAM policies are broad examples used for the sample to work smoothly, and can be scoped down to more granular permissions when used in practice.

1.0. [Set up](#1.0)

1.1. [Download and process data](#1.1)

1.2. [Insert data into Aurora PostgreSQL database](#1.2)

1.3. [Create product summaries](#1.3)

1.4. [\[Optional\] Create embeddings](#1.4)

## <a id="1.0">Set up<a>

In [None]:
# run this cell to upgrade to the latest version of boto3 if required, and restart the kernel
%pip install --upgrade --quiet botocore boto3

In [None]:
%load_ext autoreload
%autoreload 2

# Python Built-Ins:
import gzip
import logging
import json
import re
from time import sleep

# External Dependencies:
import boto3
import numpy as np
import pandas as pd
import sagemaker
from tqdm import tqdm

<div class="alert alert-block alert-warning">

IMPORTANT! Please copy and paste the required information for your <b>RDS Aurora PostgreSQL database</b> in the cell below.
    
</div>

In [None]:
sess = sagemaker.Session()
bucket = sess.default_bucket()
region = sess.boto_region_name
accountid = sess.account_id()
product_db_data_path = 'amazon-reviews-fashion-metadata'
bedrock_kb_data_path = 'bedrock-kb-data'
bedrock_kb_datasource_uri = f's3://{bucket}/{bedrock_kb_data_path}/'

database_identifier = '<TODO>'
database_arn = '<TODO>'
database_secret_arn = '<TODO>'
database_name = '<TODO>'

## <a id="1.1">1.1 Download and process data<a>

In [None]:
!curl -O https://datarepo.eng.ucsd.edu/mcauley_group/data/amazon_v2/metaFiles2/meta_AMAZON_FASHION.json.gz

In [None]:
dataframe = pd.read_json('meta_AMAZON_FASHION.json.gz', lines=True)
items = dataframe[
    ['asin','title','brand','price','description','imageURLHighRes']
].dropna().drop_duplicates(subset='asin').reset_index(drop=True)
items = items.rename(columns={'imageURLHighRes': 'image'})
items = items[
    items["title"].str.contains("error", flags=re.IGNORECASE, regex=True) == False
].reset_index(drop=True)
items = items[items["price"].str.contains('-') == False].reset_index(drop=True)

items['asin'] = items['asin'].map(lambda x: re.sub(r'\W+', '', x))
items['title'] = items['title'].str.replace("[^0-9a-zA-Z ]+"," ",regex=True)
items['description'] = items['description'].astype(str).map(
    lambda x: x.strip('({[])}\'""')
).str.replace("[^0-9a-zA-Z\.,\"/ ]+"," ",regex=True)
items['image'] = items['image'].astype(str).map(lambda x: x.strip('({[])}\'"'))
items['price'] = items['price'].map(lambda x: x.strip('$').replace(',','')).astype(float)

# OPTIONAL
items.to_csv('items.txt', sep='|', index=False, header=False)
# items.to_csv('items.csv')

s3_sess = boto3.Session().resource('s3')
s3_sess.Bucket(bucket).upload_file('items.txt',f'{product_db_data_path}/items.txt')

items

## <a id="1.2">1.2 Insert data into Aurora PostgreSQL database<a>

### Prerequisites:
- You must have created a database instance, e.g. using [RDS console](https://console.aws.amazon.com/rds/databases) with an admin user and password, and a default database (e.g. postgres)
- You must have created a secret to store the database credentials at [AWS Secrets Manager](https://console.aws.amazon.com/secretsmanager/)

### Create a IAM policies and roles for RDS to import data from S3

In [None]:
rds_trust_policy = f'''{{
    "Version": "2012-10-17",
    "Statement": [
        {{
            "Sid": "",
            "Effect": "Allow",
            "Principal": {{
                "Service": "rds.amazonaws.com"
            }},
            "Action": "sts:AssumeRole"
        }}
    ]
}}'''
featurename = 's3Import'
rds_s3_role_name = 'retailai-rds-s3-role'
rds_s3_role_description = 'IAM role for RDS to import data from S3'
rds_s3_policy_name = 'retailai-rds-s3-import-policy'
rds_s3_policy_description = 'IAM policy for RDS to import data from S3'
rds_s3_policy_document = f'''{{
    "Version": "2012-10-17",
    "Statement": [
        {{
            "Sid": "s3import",
            "Action": ["s3:GetObject", "s3:ListBucket"],
            "Effect": "Allow",
            "Resource": [
                "arn:aws:s3:::{bucket}",
                "arn:aws:s3:::{bucket}/*"
            ]
        }}
    ]
}}'''

iam = boto3.client('iam')
try:
    response_policy = iam.create_policy(
        PolicyName=rds_s3_policy_name,
        PolicyDocument=rds_s3_policy_document,
        Description=rds_s3_policy_description,
    )
except Exception as e:
    print('Failed to create policy.', e)

try:
    response_role = iam.create_role(
        RoleName=rds_s3_role_name,
        AssumeRolePolicyDocument=rds_trust_policy,
        Description=rds_s3_role_description,
    )
except Exception as e:
    print('Failed to create role.', e)

try:
    response_policy
    response_role
except Exception as e:
    print(e)
else:
    add_policy = iam.attach_role_policy(
        RoleName=rds_s3_role_name,
        PolicyArn=response_policy['Policy']['Arn']
    )
    sleep(5)  # Allow permissions to propagate before attempting to attach

    rdsclient = boto3.client('rds')
    try:
        response_db_role = rdsclient.add_role_to_db_cluster(
            DBClusterIdentifier=database_identifier,
            RoleArn=response_role['Role']['Arn'],
            FeatureName=featurename
        )
    except Exception as e:
        print('Failed to attach role to DB.', e)

### Create and run queries to upload data from S3 to RDS

In [None]:
sql_queries = [
    "CREATE EXTENSION IF NOT EXISTS aws_s3 CASCADE;",
    "DROP TABLE IF EXISTS products;",
    f"""
    CREATE TABLE "products" (
    "asin" VARCHAR(1024) PRIMARY KEY,
      "title" TEXT,
      "brand" TEXT,
      "price" NUMERIC,
      "description" TEXT,
      "image" TEXT
    );
    """,
    f"""
    SELECT aws_s3.table_import_from_s3(
    'products',
    'asin,title,brand,price,description,image',
    'DELIMITER ''|''',
    aws_commons.create_s3_uri('{bucket}', '{product_db_data_path}/items.txt', '{region}')
    );
    """,
]

rdsdata = boto3.client('rds-data')

for query in sql_queries:
    response = rdsdata.execute_statement(
        resourceArn=database_arn,
        secretArn=database_secret_arn,
        sql=query,
        database=database_name,
    )
    print(response)

## <a id="1.3">1.3 Create product summaries<a>

This section creates a function to summarize longer product descriptions so that the text sent to Bedrock will not be too large to fit into the chunk size limit described [here](https://docs.aws.amazon.com/bedrock/latest/userguide/quotas.html#quotas-kb).

In [None]:
model_id = "anthropic.claude-3-haiku-20240307-v1:0"

def generate_product_summary(model_id, input_text, log_level='ERROR'):

    accept = "application/json"
    content_type = "application/json"

    system_prompt = "Please output only a useful and search optimized summary of the product description in prose form. Do not output anything else."
    body = json.dumps({
        "anthropic_version": "bedrock-2023-05-31",
        "max_tokens": 200,
        "system": system_prompt,
        "messages": [
          {
            "role": "user",
            "content": [
              {
                "type": "text",
                "text": str(input_text)
              }
            ]
          }
        ]
    })

    bedrock = boto3.client(service_name='bedrock-runtime')
    response = bedrock.invoke_model(body=body, modelId=model_id)
    response_body = json.loads(response.get('body').read())['content'][0]['text']

    level = logging.getLevelName(log_level)
    logging.basicConfig()
    logging.getLogger().setLevel(level)
    logger = logging.getLogger(__name__)
    logger.info(f"Generating summaries with Claude 3 {model_id}")

    return response_body

In [None]:
%%time

optim_desc = []
total_summaries = sum(items.description.apply(lambda x: 1 if len(str(x)) > 1024 else 0))
print(f'Generating a total of {total_summaries} summaries')
for i, desc in enumerate(tqdm(items['description'])):
    optim_desc.append(generate_product_summary(model_id, desc) if len(str(desc)) > 1024 else desc)
    # print(i)

In [None]:
items = items.assign(search_description=optim_desc)
json_str = items[['title','brand','search_description']].to_json(orient='records')
json_obj = json.loads(json_str)
len(json_obj)

#### We save each file with the ASIN (uuid) as the file name for easy ingestion and retrieval with Amazon Bedrock Knowledge Base later on. 

For each file, the text sent to Bedrock must not be too large to fit into the chunk size limit described [here](https://docs.aws.amazon.com/bedrock/latest/userguide/quotas.html#quotas-kb).

In [None]:
text_list = []

for j, json_line in enumerate(tqdm(json_obj)):
    text = json.loads(json.dumps(json_line))
    print('item {} - title {}'.format(j, text['title']))
    asin = items['asin'][j]
    text_clean = str(text).replace('"', "'").replace(r'/[^a-zA-Z0-9 ]/g', '')
    text_list.append(text_clean)
    print('creating entry for ASIN {}'.format(asin))
    encoded_string = str(text_clean).encode("utf-8")
    s3_sess.Bucket(bucket).put_object(Key=f'{bedrock_kb_data_path}/{asin}.txt', Body=encoded_string)
    print('saving item number {}'.format(j))

In [None]:
len(text_list)

## 1.4 <a id="1.4">\[Optional\] Create embeddings<a>
    
This section is provided for you to create your own embeddings for loading into your own vector databases if it is preferred. 
    
It can also serve as a sanity check for the data that will be sent to Bedrock Knowledge Bases, in notebook 2, which would be managing the embedding process for you.

In [None]:
# model_id = "amazon.titan-embed-text-v1"
model_id = 'cohere.embed-english-v3'


def generate_embeddings(model_id, input_text, log_level='ERROR'):

    accept = "*/*"
    content_type = "application/json"

    ### use the following for titan
    # Create request body.
    # body = json.dumps({
    #     "inputText": input_text,
    # })

    ### use the following for cohere
    body = json.dumps({
        "texts": [input_text],
        "input_type": "search_document",
        "truncate": "END"
    })

    bedrock = boto3.client(service_name='bedrock-runtime')
    response = bedrock.invoke_model(
        body=body, modelId=model_id, accept=accept, contentType=content_type
    )

    response_body = json.loads(response.get('body').read())

    level = logging.getLevelName(log_level)
    logging.basicConfig()
    logging.getLogger().setLevel(level)
    logger = logging.getLogger(__name__)
    logger.info("Generating embeddings with %s", model_id)
    logger.info("String length %s", str(len(input_text)))

    return response_body

In [None]:
embedding_list = []
for i, item in enumerate(tqdm(text_list)):
    # print(f'embedding item {i}')
    embedding_list.append(generate_embeddings(model_id, item))

In [None]:
len(embedding_list)

In [None]:
items_embed=items.assign(text_list=text_list,
                    embeddings=embedding_list)
# items_embed.to_csv('items_embed.txt', sep='|', index=False, header=False)
# items_embed.to_csv('items_embed.csv')
items_embed