# Amazon Reviews

#### Overview:

* Import Python Libraries
* Estabish Secure Connetion to Snowflake using AWS Secrets Manager
* Exploratory Data Analysis using Snowpark DataFrames
  * Load data into Snowpark DataFrames
    * Movies and TV reviews
    * Musical Instruments Reviews
  * Snowpark DataFrame Actions and Transformations
    * Access Semi-structured Data using Snowpark DataFrame API
    * Combine Movies & TV with Musical Instruments Reviews
    * Replace Missing Values for REVIEWTEXT column with "Not available"
  * Snowpark Python User-Defined Function (UDF) 
    * (Simple) Create and Register UDF named "to_lowercase"
    * Call "to_lowercase" UDF on "SUMMARY" column in Snowpark DataFrame
    * (Complex) Create and Register Natural Language Processing (NLP) UDF using spaCy and BeautifulSoup
    * Call NLP UDF on column "REVIEWTEXT" in Snowpark DataFrame to tokenize Amazon Reviews
  * Snowpark Python Stored Procedure (SP)
    * Create Python Function to Transform Data
    * Register Python Function as Snowpark Stored Procedure to Deploy Code to Snowflake
    * Execute Snowpark Stored Procedure to Transform Data on Snowflake
    * Examine Results - Reviews and Tokens

*Data Source: https://cseweb.ucsd.edu/~jmcauley/datasets/amazon_v2/*


### Import Python Libraries

In [1]:
# Snowpark
from snowflake.snowpark.session import Session
from snowflake.snowpark.types import PandasSeries, PandasDataFrame
from snowflake.snowpark.functions import udf, count, avg, sum, col,lit,listagg,call_builtin,when,count_distinct,array_agg,array_construct,call_udf,sproc,parse_json,json_extract_path_text,get_path
from snowflake.snowpark.version import VERSION

# Misc
import pandas as pd
import json
from cachetools import cached
import logging 
logger = logging.getLogger("snowflake.snowpark.session")
logger.setLevel(logging.ERROR)

# For AWS Secrets Manager
import boto3
from botocore.exceptions import ClientError

### Establish Secure Connection to Snowflake using AWS Secrets Manager

*NOTE: Other options include loading credentials from a file (for example, connection.json), Okta, SSO, MFA*

In [2]:
# Load Snowflake connection details from AWS Secrets Manager
def get_aws_sf_connection_details(secret_name,region_name):
    
    # Create a Secrets Manager boto3 client
    boto3_session = boto3.session.Session()
    client = boto3_session.client(service_name='secretsmanager',region_name=region_name)
    
    get_secret_value_response = None

    try:
        # Get secret values(s) based on the passed in secret name
        get_secret_value_response = client.get_secret_value(SecretId=secret_name)['SecretString']
    except ClientError as e:
        if e.response['Error']['Code'] == 'DecryptionFailureException':
            # Secrets Manager can't decrypt the protected secret text using the provided KMS key.
            raise e
        elif e.response['Error']['Code'] == 'InternalServiceErrorException':
            # An error occurred on the server side.
            raise e
        elif e.response['Error']['Code'] == 'InvalidParameterException':
            # You provided an invalid value for a parameter.
            raise e
        elif e.response['Error']['Code'] == 'InvalidRequestException':
            # You provided a parameter value that is not valid for the current state of the resource.
            raise e
        elif e.response['Error']['Code'] == 'ResourceNotFoundException':
            # We can't find the resource that you asked for.
            raise e

    return get_secret_value_response
        
# Create Snowflake Session object
connection_parameters = json.loads(get_aws_sf_connection_details('dash-sfdevrel-connection','us-west-2'))
session = Session.builder.configs(connection_parameters).create()

snowflake_environment = session.sql('select current_role(), current_warehouse(), current_database(), current_schema(), current_version()').collect()
snowpark_version = VERSION

# Current Environment Details
print('Role                        : {}'.format(snowflake_environment[0][0]))
print('Warehouse                   : {}'.format(snowflake_environment[0][1]))
print('Database                    : {}'.format(snowflake_environment[0][2]))
print('Schema                      : {}'.format(snowflake_environment[0][3]))
print('Snowflake version           : {}'.format(snowflake_environment[0][4]))
print('Snowpark for Python version : {}.{}.{}'.format(snowpark_version[0],snowpark_version[1],snowpark_version[2]))

Role                        : ACCOUNTADMIN
Warehouse                   : DASH_S
Database                    : DASH_DB
Schema                      : DASH_SCHEMA
Snowflake version           : 7.9.0
Snowpark for Python version : 1.0.0


### Exploratory Data Analysis using Snowpark DataFrames

#### Load data into Snowpark DataFrames

##### Movies & TV Reviews

In [3]:
df_amazon_reviews_movies = session.table('amazon_reviews_movies')
df_amazon_reviews_movies.show()

---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"OVERALL"  |"VERIFIED"  |"REVIEWTIME"  |"REVIEWERID"    |"ASIN"      |"STYLE"                |"REVIEWERNAME"  |"REVIEWTEXT"                                        |"SUMMARY"                                           |"UNIXREVIEWTIME"  |
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|5          |True        |07 12, 2010   |A32O1V2PKOE5DY  |B000UR9TKK  |"{'Format:': ' DVD'}"  |Morryt          |I love my collection of the nine seasons of Eve...  |My therape is here!                                 |1278892800        |
|5          |False       |07 5, 2010    |A16GAIJ

##### Musical Instruments Reviews

In [4]:
df_amazon_reviews_musical_inst = session.table('amazon_reviews_musical_instruments')
df_amazon_reviews_musical_inst.show()

------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"OVERALL"  |"VERIFIED"  |"REVIEWTIME"  |"REVIEWERID"    |"ASIN"      |"STYLE"  |"REVIEWERNAME"       |"REVIEWTEXT"                                        |"SUMMARY"                                           |"UNIXREVIEWTIME"  |
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|5          |true        |07 7, 2018    |A5NUSGGKE1BNB   |B01H7N27L2  |NULL     |Mason                |Perfect fit and comfortable. Nice pocket placem...  |Great saddle, just what i needed                    |1530921600        |
|5          |true        |03 15, 2018   |A3ELFQ6XR2RBPH  |B01H7N27L2  |NULL     |Bri

### Snowpark DataFrame Actions and Transformations

##### Access Semi-structured Data using Snowpark DataFrame API

In [5]:
df_amazon_reviews_movies.select(json_extract_path_text(col('STYLE'), lit('"Format:"')).alias('VALUE')).show()

-----------
|"VALUE"  |
-----------
| DVD     |
| DVD     |
| DVD     |
| DVD     |
| DVD     |
| DVD     |
| DVD     |
| DVD     |
| DVD     |
| DVD     |
-----------



##### Combine Movies & TV with Musical Instruments Reviews

In [5]:
df_amazon_reviews_all = df_amazon_reviews_musical_inst.union(df_amazon_reviews_movies)
df_amazon_reviews_all.show()

-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"OVERALL"  |"VERIFIED"  |"REVIEWTIME"  |"REVIEWERID"    |"ASIN"      |"STYLE"  |"REVIEWERNAME"      |"REVIEWTEXT"                                        |"SUMMARY"                                       |"UNIXREVIEWTIME"  |
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|5          |true        |11 24, 2014   |A3A1S9TTUM5RUR  |B003KISCCW  |NULL     |timguitartaylor     |Doesn't get any better than this!                   |Five Stars                                      |1416787200        |
|5          |false       |08 29, 2014   |A1HTFH0H09OBWT  |B003KJ0ZD0  |NULL     |Robert Wulf         |Gr

##### Replace Missing Values for REVIEWTEXT column with "Not available"

In [6]:
null_cnt = df_amazon_reviews_all.filter(col("REVIEWTEXT").is_null()).count()
df_amazon_reviews_all = df_amazon_reviews_all.fillna('Not available',subset=["REVIEWTEXT"])
print(f"Number of records with missing REVIEWTEXT column filled with 'Not available':",null_cnt)
print(f"Total records:",df_amazon_reviews_all.count())

Number of records with missing REVIEWTEXT column filled with 'Not available': 8688
Total records: 10003402


### Snowpark Python User-Defined Function (UDF) 

#### Create and Register UDF named "to_lowercase"

In [9]:
@udf(session=session,name='to_lowercase',replace=True,is_permanent=True,stage_location='@dash_udfs')
def to_lowercase(txt: str) -> str:
    return txt.lower()

#### Call "to_lowercase" UDF on "SUMMARY" column in Snowpark DataFrame

In [10]:
df_amazon_reviews_all.select("SUMMARY",call_udf('to_lowercase',col('SUMMARY')).as_('SUMMARY_LOWERCASE')).show()

-----------------------------------------------------------------------------------------------------------
|"SUMMARY"                                           |"SUMMARY_LOWERCASE"                                 |
-----------------------------------------------------------------------------------------------------------
|Five Stars                                          |five stars                                          |
|Five Stars                                          |five stars                                          |
|Five Stars                                          |five stars                                          |
|Five Stars                                          |five stars                                          |
|Excellent                                           |excellent                                           |
|Kyser                                               |kyser                                               |
|They were easy to put on th

#### Create and Register Natural Language Processing (NLP) UDF using spaCy and BeautifulSoup

In [119]:
session.clear_imports()
session.add_import('@dash_udf_imports/en_core_web_sm.zip.gz')

@cached(cache={})
def load_en_core_web_sm(input_file,output_dir)-> object:
    import zipfile
    import spacy
    with zipfile.ZipFile(input_file, 'r') as zip_ref:
        zip_ref.extractall(output_dir)
    # load and return the english language small model of spacy
    nlp = spacy.load(output_dir + "/en_core_web_sm/en_core_web_sm-2.3.0")
    return nlp

@udf(session=session,packages=['spacy==2.3.5','beautifulsoup4','cachetools==4.2.2'],name='batch_nlp_tokenize_text',replace=True,is_permanent=True,stage_location='@dash_udfs')
def batch_nlp_tokenize_text(df: PandasDataFrame[str]) -> PandasSeries[list]:
    import os
    import sys
    import spacy
    from bs4 import BeautifulSoup 
    from spacy.tokenizer import Tokenizer
                       
    IMPORT_DIRECTORY_NAME = "snowflake_import_directory"
    import_dir = sys._xoptions[IMPORT_DIRECTORY_NAME]
    
    input_file = import_dir + 'en_core_web_sm.zip'
    output_dir = '/tmp/en_core_web_sm' + str(os.getpid())
    
    nlp = load_en_core_web_sm(input_file,output_dir)    
    tokenizer = Tokenizer(nlp.vocab)
    
    df.columns = ['REVIEWTEXT']

    # strip html
    df['REVIEWTEXT'] = df['REVIEWTEXT'].apply(lambda txt: BeautifulSoup(txt, "html.parser").get_text())
    
    # tokenize, lemmatize verbs and remove stop words
    df['TOKENS'] = df['REVIEWTEXT'].apply(lambda txt: [t.lemma_ for t in tokenizer(txt) if not t.is_stop])

    return df['TOKENS']

#### Call NLP UDF on column "REVIEWTEXT" in Snowpark DataFrame to tokenize Amazon Reviews

##### Sample Size: 10 records

In [7]:
df_amazon_reviews_all.select('REVIEWTEXT',call_udf('batch_nlp_tokenize_text',col('REVIEWTEXT')).as_("TOKENS")).limit(10).show()

--------------------------------------------------------------------------
|"REVIEWTEXT"                                        |"TOKENS"           |
--------------------------------------------------------------------------
|Great, still learning the various adjustment        |[                  |
|                                                    |  "Great,",        |
|                                                    |  "learn",         |
|                                                    |  "adjustment"     |
|                                                    |]                  |
|Great replacements.                                 |[                  |
|                                                    |  "Great",         |
|                                                    |  "replacements."  |
|                                                    |]                  |
|I just recently bought this guitar and have rea...  |[                  |
|                        

### Snowpark Python Stored Procedure (SP)

#### Create Python Function to Transform the Data

In [18]:
def process_amazon_reviews(session: Session) -> str:
  # Load Movies and TV Reviews
  df_amazon_reviews_movies = session.table('amazon_reviews_movies')

  # Load Musical Instruments Reviews
  df_amazon_reviews_musical_inst = session.table('amazon_reviews_musical_instruments')

  # Combine Movies & TV with Musical Instruments Reviews
  df_amazon_reviews_all = df_amazon_reviews_musical_inst.union(df_amazon_reviews_movies)

  # Replace Missing Values for REVIEWTEXT column with "Not available"
  df_amazon_reviews_all = df_amazon_reviews_all.fillna('Not available',subset=["REVIEWTEXT"])

  # Call NLP UDF on column "REVIEWTEXT" to tokenize Amazon Reviews
  df_amazon_reviews_all = df_amazon_reviews_all.select('*',call_udf('batch_nlp_tokenize_text',col('REVIEWTEXT')).as_("TOKENS"))

  # Save all columns including tokens in a new table
  df_amazon_reviews_all.write.mode('overwrite').save_as_table('AMAZON_REVIEWS_WITH_TOKENS')

  return "SUCCESS"

<snowflake.snowpark.stored_procedure.StoredProcedure at 0x7fa8a9773e20>

#### Register Python Function as Snowpark Stored Procedure to Deploy Code to Snowflake

In [None]:
# Register function as a Stored Procedure
session.sproc.register(
  func=process_amazon_reviews,
  name="process_amazon_reviews",
  packages=['snowflake-snowpark-python'],
  is_permanent=True,
  stage_location="@dash_sprocs",
  replace=True)

#### Execute Snowpark Stored Procedure to Transform Data on Snowflake

In [19]:
print(session.call('process_amazon_reviews'))

SUCCESS


#### Examine Results - Reviews and Tokens

In [9]:
df_amazon_reviews_all = session.table('AMAZON_REVIEWS_WITH_TOKENS').select('REVIEWTEXT','TOKENS')
df_amazon_reviews_all.show()

-------------------------------------------------------------------------------
|"REVIEWTEXT"                                        |"TOKENS"                |
-------------------------------------------------------------------------------
|This 3rd installment of the series was disappoi...  |[                       |
|                                                    |  "3rd",                |
|                                                    |  "installment",        |
|                                                    |  "series",             |
|                                                    |  "disappointing.",     |
|                                                    |  "Kate",               |
|                                                    |  "Beckinsale,",        |
|                                                    |  "remind",             |
|                                                    |  "3rd",                |
|                                       