## Part 1: Data Ingestion
Directly importing from S3

In [None]:
import os
import boto3

# Set the AWS access key and secret key
os.environ['AWS_ACCESS_KEY_ID'] = 'USE_YOUR_KEYS'
os.environ['AWS_SECRET_ACCESS_KEY'] = 'USE_YOUR_KEYS'

# Create an S3 client
s3 = boto3.client('s3')

# Set the name of the S3 bucket
bucket_name = 'amazon-reviews'

# Try to list the contents of the bucket
try:
    # Use the S3 client to list the objects in the bucket
    response = s3.list_objects(Bucket=bucket_name)
    print(f'Access to S3 bucket {bucket_name} is successful')
    print(response)
except Exception as e:
    print(f'Error: {e}')


Access to S3 bucket amazon-reviews is successful
{'ResponseMetadata': {'RequestId': 'HJ3B0KA3Y4G6KRVT', 'HostId': 'ujKGbOrKr5VWxTfGnir5Do9PTz+AuUfLOGWK2gCSPGcIBKcSgQG0k8U8GVCGaCDH7rp+gHiozAY=', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amz-id-2': 'ujKGbOrKr5VWxTfGnir5Do9PTz+AuUfLOGWK2gCSPGcIBKcSgQG0k8U8GVCGaCDH7rp+gHiozAY=', 'x-amz-request-id': 'HJ3B0KA3Y4G6KRVT', 'date': 'Fri, 20 Jan 2023 03:26:32 GMT', 'x-amz-bucket-region': 'us-west-2', 'content-type': 'application/xml', 'transfer-encoding': 'chunked', 'server': 'AmazonS3'}, 'RetryAttempts': 1}, 'IsTruncated': False, 'Marker': '', 'Contents': [{'Key': 'amazon_reviews_us_Books_v1_00.tsv.gz', 'LastModified': datetime.datetime(2023, 1, 17, 16, 38, 4, tzinfo=tzlocal()), 'ETag': '"853da645a25daf3d3c86559dde032f82-160"', 'Size': 2740337188, 'StorageClass': 'STANDARD', 'Owner': {'DisplayName': 'info', 'ID': 'f164a2c4b766eb258859f6a107c41f628bbcad023221405d299ecb211ca16cb0'}}], 'Name': 'amazon-reviews', 'Prefix': '', 'MaxKeys': 1000, 'Encod

In [None]:
import pyspark
from delta import *
from pyspark.sql import SparkSession

# Define the input and output formats and paths and the table name.
write_format = 'delta'
#load_path = 'file:/databricks/driver/bank.csv'
save_path = 'file:/databricks/driver/tmp/delta/amazon-reviews-2g'
table_name = 'default.amazonreviews2g'


# Define the S3 path to the TSV.gz file
s3_path = "s3a://amazon-reviews/amazon_reviews_us_Books_v1_00.tsv.gz"

# Create a SparkSession
builder = pyspark.sql.SparkSession.builder.appName("amazon-reviews") \
  .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
  .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

#spark = SparkSession.builder.appName("amazon-reviews").getOrCreate()

spark = configure_spark_with_delta_pip(builder).getOrCreate()


# Create a DataFrame from the TSV.gz file
df = spark.read.format("csv").option("header", "true").option("delimiter", "\t").load(s3_path)

# Show the DataFrame
#df.show()

#df.printSchema()

df.createOrReplaceTempView(table_name)
# df.write.saveAsTable(table_name, path=save_path)

df.write.mode("overwrite").saveAsTable(table_name, path=save_path)



In [None]:
ls -lh /databricks/driver/tmp/delta/amazon-reviews-2g

total 3.9G
drwxr-xr-x 2 root root 4.0K Jan 20 03:31 [0m[01;34m_delta_log[0m/
-rw-r--r-- 1 root root 3.9G Jan 20 03:31 part-00000-ce14c406-1ab6-4c82-9152-fc27f27d8755-c000.snappy.parquet


## Part 2: Exploring The Data

In [None]:
read_format = 'delta'
load_path = 'file:/databricks/driver/tmp/delta/amazon-reviews-2g'

# read from the table instead of s3
df = spark.read.format(read_format).option("inferSchema", "true").load(load_path)
df.printSchema()

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: string (nullable = true)
 |-- product_title: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- star_rating: string (nullable = true)
 |-- helpful_votes: string (nullable = true)
 |-- total_votes: string (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: string (nullable = true)



In [None]:
# Count the number of rows where the "date" column is null
null_rows = df.filter(df["review_date"].isNull()).count()

# Print the result
print("Number of rows with null date:", null_rows)


Number of rows with null date: 2288


## Part 2.1: Feature Engineering

After a brief inspection of the data, we found there are a series of data pre-processing we have to conduct.

* Remove the “Title” feature.
* Remove the rows where “Review Text” were missing.
* Clean “Review Text” column.
* Determine the best method to calculate sentiment polarity

In [None]:
from pyspark.sql.functions import regexp_replace, trim

df = df.drop('marketplace')
df = df.drop(*['customer_id', 'review_id', 'product_id'])
df = df.drop(*['product_parent', 'product_category', 'helpful_votes', 'total_votes', 'vine', 'verified_purchase'])

df = df.dropna(subset=["review_body"])

def preprocess_df(df):
    df = df.withColumn("review_body", regexp_replace("review_body", "(<br/>)", ""))
    df = df.withColumn("review_body", regexp_replace("review_body", "(<a).*(>).*(</a>)", ""))
    df = df.withColumn("review_body", regexp_replace("review_body", "(&amp)", ""))
    df = df.withColumn("review_body", regexp_replace("review_body", "(&gt)", ""))
    df = df.withColumn("review_body", regexp_replace("review_body", "(&lt)", ""))
    df = df.withColumn("review_body", regexp_replace("review_body", "(\\xa0)", " "))
    return df

df = preprocess_df(df)

df.show()



+--------------------+-----------+--------------------+--------------------+-----------+
|       product_title|star_rating|     review_headline|         review_body|review_date|
+--------------------+-----------+--------------------+--------------------+-----------+
|There Was an Old ...|          5|          Five Stars|I love it and so ...| 2015-08-31|
|      I Saw a Friend|          5|Please buy "I Saw...|My wife and I ord...| 2015-08-31|
|Black Lagoon, Vol. 6|          5|       Shipped fast.|Great book just l...| 2015-08-31|
|           If I Stay|          5|          Five Stars|        So beautiful| 2015-08-31|
|Stars 'N Strips F...|          5|          Five Stars|Enjoyed the autho...| 2015-08-31|
|            The Liar|          2|PREDICTABLE ALMOS...|Two or three page...| 2015-08-31|
|Devil in the Deta...|          5|The Monastery Mur...|&#34;Secrets in t...| 2015-08-31|
|Knowing When to S...|          5|          Five Stars|          I love it!| 2015-08-31|
|The American Pageant

In [None]:
# Using pipeline class to make predictions from models available in the Hub in an easy way 
from transformers import pipeline
from pyspark.sql.functions import col
from transformers import pipeline
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.functions import array

# Initialize the sentiment analysis pipeline
sentiment_pipeline = pipeline('sentiment-analysis', model='finiteautomata/bertweet-base-sentiment-analysis')

# Apply the pipeline to each review
# sentiments = df.rdd.map(lambda row: sentiment_pipeline(row['review_body'])[0]['label']).toDF(['sentiment'])
sentiments = df.rdd.map()

# Add the sentiment column back to the original dataframe
df = df.withColumn('sentiment', sentiments['sentiment'])




emoji is not installed, thus not converting emoticons or emojis into text. Install emoji: pip3 install emoji==0.6.0


[0;31m---------------------------------------------------------------------------[0m
[0;31mTypeError[0m                                 Traceback (most recent call last)
File [0;32m<command-2700848159738089>:12[0m
[1;32m      9[0m sentiment_pipeline [38;5;241m=[39m pipeline([38;5;124m'[39m[38;5;124msentiment-analysis[39m[38;5;124m'[39m, model[38;5;241m=[39m[38;5;124m'[39m[38;5;124mfiniteautomata/bertweet-base-sentiment-analysis[39m[38;5;124m'[39m)
[1;32m     11[0m [38;5;66;03m# Apply the pipeline to each review[39;00m
[0;32m---> 12[0m sentiments [38;5;241m=[39m df[38;5;241m.[39mrdd[38;5;241m.[39mmap([38;5;28;01mlambda[39;00m row: sentiment_pipeline(row[[38;5;124m'[39m[38;5;124mreview_body[39m[38;5;124m'[39m])[[38;5;241m0[39m][[38;5;124m'[39m[38;5;124mlabel[39m[38;5;124m'[39m])[38;5;241m.[39mtoDF([[38;5;124m'[39m[38;5;124msentiment[39m[38;5;124m'[39m])
[1;32m     14[0m [38;5;66;03m# Add the sentiment column back to the origi

## Part 3: Next Steps

* Create new feature for the length of the review.
* Create new feature for the word count of the review.
* Single variable visualization with Plotly
* The distribution of review ratings
* The distribution of reviews lengths
* The distribution of reviews word counts
* The distribution of topics, e.g., Books on Machine Learing, Fiction, etc.
* Top 20 one-words in review after removing stop words
* Top 20 two-words (bigrams) used in reviews after removing stop words
* Top 20 trigrams used in reviews after removing stop words
* The distribution of top parts-of-speech (PoS) in the review corpus
* Sentiment Polarity Boxlet of Books Topics - Ideally Machine Learning