### Amazon Review Dataset


In [6]:
!pip install awswrangler

You should consider upgrading via the '/home/ec2-user/anaconda3/envs/python3/bin/python -m pip install --upgrade pip' command.[0m


In [1]:
import awswrangler as wr 
import pandas as pd 

In [2]:
product_query = '''SELECT distinct product_id, product_category, product_title
    FROM 
    (SELECT *
    FROM 
        (SELECT customer_id,
         count(*) AS cnt
        FROM (select * from amazon_reviews_parquet where year>=2015)
        GROUP BY  customer_id)
        WHERE cnt >= 5 ) AS a
    JOIN amazon_reviews_parquet AS pca
    ON pca.customer_id = a.customer_id'''



item_df = wr.athena.read_sql_query(product_query, database="default", ctas_approach=False)

In [3]:
item_df.head()

Unnamed: 0,product_id,product_category,product_title
0,B001T4GX5C,Outdoors,Lucky Hardware Lucky Green
1,B003VWXXXK,Kitchen,Ninja Professional Blender (NJ600) (Discontinued)
2,B002SMJQT4,Health_&_Personal_Care,Adaptive Sound Technologies - Sound+Sleep
3,B001EQ56DA,Grocery,EnviroKidz Organic Chocolate Koala Crisp Cereal
4,B005SPZG5Q,Baby,Brica Drink Pod


In [4]:
item_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 11570547 entries, 0 to 11570546
Data columns (total 3 columns):
 #   Column            Dtype 
---  ------            ----- 
 0   product_id        string
 1   product_category  string
 2   product_title     string
dtypes: string(3)
memory usage: 264.8 MB


In [5]:
user_item_query = '''SELECT product_id, product_category,a.customer_id, star_rating, review_date*86400 as timestamp 
    FROM 
    (SELECT *
    FROM 
        (SELECT customer_id,
         count(*) AS cnt
        FROM (select * from amazon_reviews_parquet where year>=2015)
        GROUP BY  customer_id)
        WHERE cnt >= 5 ) AS a
    JOIN amazon_reviews_parquet AS pca
    ON pca.customer_id = a.customer_id'''

ui_df = wr.athena.read_sql_query(user_item_query, database="default", ctas_approach=False)

In [6]:
ui_df.head()

Unnamed: 0,product_id,product_category,customer_id,star_rating,timestamp
0,B009SA8XSY,Music,6673125,2,1425168000
1,B0012C7JO0,Outdoors,6673125,5,1425168000
2,B00DBJKYQC,Sports,6673125,4,1431734400
3,B0073MG11A,Automotive,6673125,4,1382918400
4,B004DNWVOI,Home_Improvement,6673125,5,1431734400


In [7]:
ui_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 55921681 entries, 0 to 55921680
Data columns (total 5 columns):
 #   Column            Dtype 
---  ------            ----- 
 0   product_id        string
 1   product_category  string
 2   customer_id       string
 3   star_rating       Int32 
 4   timestamp         Int64 
dtypes: Int32(1), Int64(1), string(3)
memory usage: 2.0 GB


### Prepare to insert into Elastic Search

In [8]:
!pip install requests 
!pip install requests-aws4auth
!pip install Elasticsearch==7.7.0 
!pip install urllib3

You should consider upgrading via the '/home/ec2-user/anaconda3/envs/python3/bin/python -m pip install --upgrade pip' command.[0m
Collecting requests-aws4auth
  Downloading requests_aws4auth-1.0.1-py2.py3-none-any.whl (29 kB)
Installing collected packages: requests-aws4auth
Successfully installed requests-aws4auth-1.0.1
You should consider upgrading via the '/home/ec2-user/anaconda3/envs/python3/bin/python -m pip install --upgrade pip' command.[0m
Collecting Elasticsearch==7.7.0
  Downloading elasticsearch-7.7.0-py2.py3-none-any.whl (99 kB)
[K     |████████████████████████████████| 99 kB 2.1 MB/s  eta 0:00:01
Installing collected packages: Elasticsearch
Successfully installed Elasticsearch-7.7.0
You should consider upgrading via the '/home/ec2-user/anaconda3/envs/python3/bin/python -m pip install --upgrade pip' command.[0m
You should consider upgrading via the '/home/ec2-user/anaconda3/envs/python3/bin/python -m pip install --upgrade pip' command.[0m


In [9]:
elastic_search_endpoint=input()

search-amazon-review-demo-6z6ub2pwv5lfhpgvqnyxjb2g6q.us-east-1.es.amazonaws.com


In [11]:
from requests_aws4auth import AWS4Auth
import boto3

host = elastic_search_endpoint
session = boto3.session.Session()
region = session.region_name

service = 'es'
credentials = boto3.Session().get_credentials()
awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token)


In [None]:
master_user=input()

In [None]:
master_user_password=input()

In [13]:
import boto3
import json
from elasticsearch import Elasticsearch, RequestsHttpConnection
def connectES(esEndPoint):
    print ('Connecting to the ES Endpoint {0}'.format(esEndPoint))
    try:
        esClient = Elasticsearch(
        hosts=[{'host': esEndPoint, 'port': 443}],
        http_auth=awsauth,
        use_ssl=True,
        verify_certs=True,
        connection_class=RequestsHttpConnection)
        return esClient
    except Exception as E:
        print("Unable to connect to {0}".format(esEndPoint))
        print(E)
        exit(3)

In [14]:
esClient = connectES(elastic_search_endpoint)

Connecting to the ES Endpoint search-amazon-review-demo-6z6ub2pwv5lfhpgvqnyxjb2g6q.us-east-1.es.amazonaws.com


In [17]:
def indexDocElement(esClient, response):
    try:
        retval = esClient.index(index='product', doc_type='product', body=response)
    except Exception as E:
        print("Doc not indexed")
        print("Error: ",E)
        exit(5)

In [None]:
item_df = item_df.fillna("")
item_arr = item_df.to_dict(orient="records")
import json

for i, j in enumerate(item_arr):
    indexDocElement(esClient,response=j)

In [24]:
import requests
r = requests.get('https://{}/product/product/_search?q=product_title:black&size=100'.format(elastic_search_endpoint), auth=awsauth)
rjson = r.json()
rjson 

{'took': 58,
 'timed_out': False,
 '_shards': {'total': 5, 'successful': 5, 'skipped': 0, 'failed': 0},
 'hits': {'total': {'value': 10000, 'relation': 'gte'},
  'max_score': 5.191208,
  'hits': [{'_index': 'product',
    '_type': 'product',
    '_id': 'nOS7V3YBWQlvJU18Ij_u',
    '_score': 5.191208,
    '_source': {'product_id': 'B004WED3YK',
     'product_category': 'Digital_Music_Purchase',
     'product_title': 'Black Is Black'}},
   {'_index': 'product',
    '_type': 'product',
    '_id': 'lfjtU3YB1EYWinhnpehP',
    '_score': 5.191208,
    '_source': {'product_id': 'B0044I7GP6',
     'product_category': 'Digital_Music_Purchase',
     'product_title': 'Black Is Black'}},
   {'_index': 'product',
    '_type': 'product',
    '_id': '1A0AXHYB1EYWinhnm9_A',
    '_score': 5.191208,
    '_source': {'product_id': 'B0013GO22M',
     'product_category': 'Digital_Music_Purchase',
     'product_title': 'Black Is Black'}},
   {'_index': 'product',
    '_type': 'product',
    '_id': 'I0t4VnYB7Zc

### Save user_item_df and item_df for later usage

In [21]:
ui_df.to_pickle("user_item_df.p")
item_df.to_pickle("item_df.p")


In [22]:
%store elastic_search_endpoint

Stored 'elastic_search_endpoint' (str)
