In [None]:
bucket = spark._jsc.hadoopConfiguration().get("fs.gs.system.bucket")

# You can redirect the bucket directory to where you stored your datasets!
data = "gs://" + bucket + "/Datasets/"
print(data)

In [None]:
# Load Review Data
df = spark.read.json(data + "All_Beauty.jsonl")
df.printSchema()

In [None]:
from pyspark.sql.functions import col, when, length, regexp_replace, trim, to_timestamp, size, coalesce, lit, date_format, lower

# Drop asin
df = df.drop("asin")

# New Column "image_exist" to flag whether review contains image or not
df = df.withColumn(
    "review_image",
    (coalesce(size(col("images")), lit(0)) > 0)
)

# Drop image
df = df.drop("images")

# Type re-organize
df = (df
      .withColumn("helpful_vote", col("helpful_vote").cast("int"))
      .withColumn("rating", col("rating").cast("int"))
      .withColumn("verified_purchase", col("verified_purchase").cast("boolean"))
)

# timestamp(ms) â†’ timestamp(sec) -> yyyy-MM-dd HH:mm:ss format
df = df.withColumn("timestamp", to_timestamp((col("timestamp")/1000).cast("double")))
df = df.withColumn("timestamp", date_format(col("timestamp"), "yyyy-MM-dd HH:mm:ss"))

# String text clean(trim)
df = (df
      .withColumn("title", trim(regexp_replace(col("title"), r"\s+", " ")))
      .withColumn("text",  trim(regexp_replace(col("text"),  r"\s+", " ")))
)

# Rating check(1-5)
df = df.where((col("rating") >= 1.0) & (col("rating") <= 5.0))

# Drop duplicates
df = df.dropDuplicates(["user_id", "parent_asin", "timestamp", "title"])



# Column order reconstruct
final_cols = [
    "user_id",
    "parent_asin",
    "timestamp",
    "rating",
    "title",
    "text",
    "helpful_vote",
    "verified_purchase",
    "review_image",
]
df = df.select(*final_cols)


In [None]:
df.show(20)

In [None]:
# Load Meta Data
spark.conf.set("spark.sql.caseSensitive", "true")
df_meta = spark.read.json(data + "meta_All_Beauty.jsonl")

In [None]:
from pyspark.sql.functions import transform, array_join

# New Column "image_exist" to flag whether review contains image or not
df_meta = df_meta.withColumn(
    "product_video",
    (coalesce(size(col("videos")), lit(0)) > 0)
)

# New Column "image_exist" to flag whether review contains image or not
df_meta = df_meta.withColumn(
    "product_image",
    (coalesce(size(col("images")), lit(0)) > 0)
)

# Drop images / videos
df_meta = df_meta.drop("images", "videos", "details")

# String text clean(trim) - clean multiple spaces & lowercase (store)
df_meta = (df_meta
    .withColumn("features",
        transform(
            col("features"),
            lambda x: (trim(regexp_replace(x, r"\s+", " ")))
        )
    )
    .withColumn("description",
        transform(
            col("description"),
            lambda x: trim(regexp_replace(x, r"\s+", " "))
        )
    )
)

df_meta = (df_meta
           .withColumn("title",  trim(regexp_replace(col("title"),  r"\s+", " ")))
           .withColumn("store", lower(trim(regexp_replace(col("store"),  r"\s+", " "))))
)



# Column order reconstruct
meta_final_cols = [
    "parent_asin",
    "title",
    "main_category",
    "categories",
    "price",
    "features",
    "description",
    "average_rating",
    "rating_number",
    "store",
    "product_image",
    "product_video"
]

df_meta = df_meta.select(*meta_final_cols)

In [None]:
df_meta.printSchema()
df_meta.show(1)

In [None]:
from pyspark.sql.functions import col

df_joined = df.join(
    df_meta,                    # the metadata table
    on="parent_asin",           # join key
    how="left"                  # keep all rows from df
)

df_joined.show(3)