1.0. Libraries load.

In [2]:
# libraries load
from pyspark.sql.functions import udf
from pyspark.sql.functions import *
from pyspark.sql.types import *
import numpy as np
import string
import re

2.0. Amazon Review dataset load.

In [4]:
from pyspark.sql.types import *

# Data frame schema definition
reviews_df_schema = StructType(
[StructField('marketplace', StringType()),
 StructField('customer_id', IntegerType()),
 StructField('review_id', StringType()),
 StructField('product_id', StringType()),
 StructField('product_parent', IntegerType()),
 StructField('product_title', StringType()),
 StructField('product_category', StringType()),
 StructField('star_rating', IntegerType()),
 StructField('helpful_votes', IntegerType()),
 StructField('total_votes', IntegerType()),
 StructField('vine', StringType()),
 StructField('verified_purchase', StringType()),
 StructField('review_headline', StringType()),
 StructField('review_body', StringType()),
 StructField('review_date', DateType())]
)

# Review dataset load
reviews_df = sqlContext.read.format('com.databricks.spark.csv')                                                           \
              .options(inferSchema=False, delimiter = '\t', nullValue='NA', header=True, encoding='UTF-8')                \
              .schema(reviews_df_schema)                                                                                  \
              .load( "/FileStore/tables/amazon_reviews_multilingual_US_v1_00_tsv-2db74.gz" )                              \
              .withColumn('product_title', lower(col('product_title')))                                                   \
              .withColumn('review_headline', lower(col('review_headline')))                                               \
              .withColumn('review_body', trim(col('review_body')))                                                        \
              .dropna()

# Save data frame in cache
reviews_df.cache()

# Exoplore review datafame
reviews_df.show(4)
#display(reviews_df.take(4))

print("Number of marketplace: " + str(reviews_df.select('marketplace').distinct().count()))
print("Number of product_category: " + str(reviews_df.select('product_category').distinct().count()))
print("Number of reviews: " + str(reviews_df.count()))

3.0. Review normalization.
- Punctuation, line break, numbers and special characters removed.

In [6]:
# Función eliminar puntuación
def remove_punct(text):
    regex = re.compile('[' + re.escape(string.punctuation) + '0-9\\r\\t\\n]')
    nopunct = regex.sub(" ", text)  
    return nopunct

punct_remover = udf(lambda x: remove_punct(x))

# Ejecución funcción punct_remover sobre campo review
reviews_df_cleaned = reviews_df.select('marketplace', 'customer_id', 'review_id', 'product_id', 'product_parent', \
                                       punct_remover('product_title'), 'product_category', 'star_rating', 'helpful_votes', 'total_votes', \
                                       'vine', 'verified_purchase', punct_remover('review_headline'), punct_remover('review_body'), 'review_date')

# Renombrado y eliminación columnas
reviews_df_cleaned = reviews_df_cleaned.withColumnRenamed('<lambda>(product_title)', 'product_title').withColumn('product_title', trim(col('product_title')))
reviews_df_cleaned = reviews_df_cleaned.withColumnRenamed('<lambda>(review_headline)', 'review_headline').withColumn('review_headline', trim(col('review_headline')))
reviews_df_cleaned = reviews_df_cleaned.withColumnRenamed('<lambda>(review_body)', 'review_body').withColumn('review_body', trim(col('review_body')))

reviews_df_cleaned.show(4, truncate=100)


3.0. Review Rating Conversion

In [8]:
# convert rating
def convert_rating(rating):
    if rating < 3: 
      return 'negative'
    elif rating == 3: 
        return 'neutral'
    else: 
      return 'positive'

# udf
rating_convert = udf(lambda x: convert_rating(x))

reviews_df_new = reviews_df_cleaned.select('marketplace', 'customer_id', 'review_id', 'product_id', 'product_parent', \
                                       'product_title', 'product_category', rating_convert('star_rating'), 'helpful_votes', 'total_votes', \
                                       'vine', 'verified_purchase', 'review_headline', 'review_body', 'review_date')

# Renombrado y eliminación columnas
reviews_df_new = reviews_df_new.withColumnRenamed('<lambda>(star_rating)', 'rating')

reviews_df_new.show(4)

4.0. Selection of important columns

In [10]:
reviews_df_final= reviews_df_new.select('marketplace', 'product_id', 'product_title', 'product_category', 'rating', 'total_votes', \
                                       'vine', 'verified_purchase', 'review_headline', 'review_body', 'review_date')

reviews_df_final.show(4)