In [0]:
dbutils.widgets.text("review_file_name", "")
review_file_name = dbutils.widgets.get("review_file_name")

dbutils.widgets.text("metadata_file_name", "")
metadata_file_name = dbutils.widgets.get("metadata_file_name")

dbutils.widgets.text("bucket_name", "")
bucket_name = dbutils.widgets.get("bucket_name")

dbutils.widgets.text("main_category", "")
main_category = dbutils.widgets.get("main_category")

In [0]:
%pip install demoji

In [0]:
from pyspark.sql.functions import col, when, sum, coalesce, lit, concat_ws
from pyspark.sql.functions import udf
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, BooleanType, LongType, FloatType, MapType, ArrayType
import re
import demoji

In [0]:
ACCESS_KEY = ""
SECRET_KEY = ""

In [0]:
sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", ACCESS_KEY)
sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", SECRET_KEY)
aws_region = "us-east-1"
sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "s3." + aws_region + ".amazonaws.com")

# Review Dataframe

## Review Dataset Schema 

In [0]:
review_schema = StructType([
    StructField("rating", FloatType(), True),
    StructField("title", StringType(), True),
    StructField("text", StringType(), True),
    StructField("images", ArrayType(StringType()), True),
    StructField("asin", StringType(), True),
    StructField("parent_asin", StringType(), True),
    StructField("user_id", StringType(), True),
    StructField("timestamp", LongType(), True),
    StructField("helpful_vote", IntegerType(), True),
    StructField("verified_purchase", BooleanType(), True)
])

## Read Dataset From S3

In [0]:
review_df = spark.read.schema(review_schema).json(f's3a://{bucket_name}/raw/{review_file_name}')
display(review_df)

In [0]:
print("Number of rows:", review_df.count())

In [0]:
null_counts = review_df.select([
    (when(col(c).isNull(), 1).otherwise(0)).alias(c) for c in review_df.columns
])

null_counts_summary = null_counts.agg(*[sum(col(c)).alias(c) for c in null_counts.columns])

display(null_counts_summary)

In [0]:
review_df = review_df.dropna()

review_df.show()

In [0]:
filtered_review_df = review_df.select("user_id", "asin", "parent_asin", "title", "text", "rating", "verified_purchase", "timestamp")

display(filtered_review_df)


In [0]:
print("Number of rows:", filtered_review_df.count())

In [0]:
filtered_review_df = filtered_review_df.filter(filtered_review_df["verified_purchase"] == True)

display(filtered_review_df)

In [0]:
print("Number of rows:", filtered_review_df.count())

In [0]:
def clean_text(text):
    text = re.sub(r'<br />', ' ', text)
    text = re.sub(r'<.*?>', ' ', text)

    text = demoji.replace(text, '')
    
    text = re.sub(r'[!]+', '!', text)

    text = re.sub(r'[^a-zA-Z0-9\s.!?]', '', text)

    text = text.lower()

    text = re.sub(r'\s+', ' ', text).strip()

    return text

In [0]:
clean_text_udf = udf(clean_text, StringType())

filtered_review_df = filtered_review_df.withColumn('text', clean_text_udf(filtered_review_df['text']))
filtered_review_df = filtered_review_df.withColumn('title', clean_text_udf(filtered_review_df['title']))

display(filtered_review_df)

# Metadata DataFrame

In [0]:
meta_schema = StructType([
    StructField("main_category", StringType(), True),
    StructField("title", StringType(), True),
    StructField("average_rating", FloatType(), True),
    StructField("rating_number", IntegerType(), True),
    StructField("features", ArrayType(StringType()), True),
    StructField("description", ArrayType(StringType()), True),
    StructField("price", StringType(), True),
    StructField("images", StructType([
        StructField("hi_res", ArrayType(StringType()), True),
        StructField("large", ArrayType(StringType()), True),
        StructField("thumb", ArrayType(StringType()), True),
        StructField("variant", ArrayType(StringType()), True)
    ]), True),
    StructField("videos", StructType([
        StructField("title", ArrayType(StringType()), True),
        StructField("url", ArrayType(StringType()), True),
        StructField("user_id", ArrayType(StringType()), True)
    ]), True),
    StructField("store", StringType(), True),
    StructField("categories", ArrayType(StringType()), True),
    StructField("details", MapType(StringType(), StringType()), True),
    StructField("parent_asin", StringType(), True),
    StructField("bought_together", StringType(), True),
    StructField("subtitle", StringType(), True),
    StructField("author", StringType(), True)
])

In [0]:
meta_df = spark.read.schema(meta_schema).json(f's3a://{bucket_name}/raw/{metadata_file_name}')
display(meta_df)

In [0]:
meta_df = meta_df.select("main_category", "title", "average_rating", "rating_number", 
                         "features", "description", "images", "categories", "parent_asin")

display(meta_df)

In [0]:
unique_main_category = meta_df.select("main_category").distinct().limit(1).collect()[0]["main_category"]

meta_df = meta_df.fillna({"main_category": unique_main_category})

In [0]:
print("Number of rows:", meta_df.count())

In [0]:
def merge_text(col_1, col_2):
    col_1 = "" if col_1 is None else col_1
    col_2 = "" if col_2 is None else col_2
    
    text_1 = " ".join(col_1) if isinstance(col_1, list) else col_1
    text_2 = " ".join(col_2) if isinstance(col_2, list) else col_2
    
    raw_text = text_1 + " " + text_2
    
    final_text = clean_text(raw_text)
    
    return final_text

In [0]:
merge_text_udf = udf(merge_text, StringType())

meta_df = meta_df.withColumn("Text Data", merge_text_udf(meta_df["features"], meta_df["description"]))

display(meta_df)

# Combine Both DataFrame

In [0]:
filtered_review_df = filtered_review_df \
    .withColumnRenamed('user_id', 'User ID') \
    .withColumnRenamed('asin', 'ASIN') \
    .withColumnRenamed('parent_asin', 'Parent ASIN') \
    .withColumnRenamed('title', 'Review Title') \
    .withColumnRenamed('text', 'Review Text') \
    .withColumnRenamed('rating', 'Rating') \
    .withColumnRenamed('verified_purchase', 'Verified Purchase')

meta_df = meta_df \
    .withColumnRenamed('main_category', 'Main Category') \
    .withColumnRenamed('title', 'Product Title') \
    .withColumnRenamed('average_rating', 'Average Rating') \
    .withColumnRenamed('rating_number', 'Rating Number') \
    .withColumnRenamed('features', 'Features') \
    .withColumnRenamed('description', 'Descriptions') \
    .withColumnRenamed('images', 'Images') \
    .withColumnRenamed('categories', 'Categories') \
    .withColumnRenamed('parent_asin', 'Parent ASIN')

In [0]:
meta_df = meta_df.drop('Features', 'Descriptions')
display(meta_df)

In [0]:
display(filtered_review_df)

In [0]:
merged_df = filtered_review_df.join(meta_df, on='Parent ASIN', how='inner')

display(merged_df)

In [0]:
print("Number of rows:", merged_df.count())

In [0]:
print(merged_df.columns)

In [0]:
s3_path = f"s3a://{bucket_name}/processed/{main_category}.parquet"

merged_df.write.mode("overwrite").parquet(s3_path, compression="SNAPPY")