In [1]:
import os
import sys

os.environ["SPARK_HOME"] = "/home/talentum/spark"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
# In below two lines, use /usr/bin/python2.7 if you want to use Python 2
os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3.6" 
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.7-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

# NOTE: Whichever package you want mention here.
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0 pyspark-shell' 
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-avro_2.11:2.4.0 pyspark-shell'
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0,org.apache.spark:spark-avro_2.11:2.4.3 pyspark-shell'
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0,org.apache.spark:spark-avro_2.11:2.4.0 pyspark-shell'


# Import SparkSession class
# SparkSession is the entry point to use Spark SQL and DataFrame API
from pyspark.sql import SparkSession

# Import commonly used Spark SQL functions
# col   -> used to refer to a column in a DataFrame
# when  -> used for conditional logic (similar to if-else)
# count -> used for counting rows or values
# size  -> used to get the size of an array-type column
from pyspark.sql.functions import col, when, count, size


# Create a SparkSession object
# This starts a Spark application and connects to the Spark cluster
spark = SparkSession.builder\
    .appName("AmazonAppliancesIngestionNotebook").getOrCreate()                                   
# Name of the Spark application (useful for monitoring)
# Creates a new session or returns existing one


In [4]:
REVIEWS_RAW_PATH = "amazon-review-analytics/big-data-pipeline/data/raw/reviews/Appliances.json"
METADATA_RAW_PATH = "amazon-review-analytics/big-data-pipeline/data/raw/metadata/meta_Appliances.json"

reviews_raw_df = spark.read.json(REVIEWS_RAW_PATH)

metadata_raw_df = spark.read.json(METADATA_RAW_PATH)



reviews_raw_df.printSchema()
metadata_raw_df.printSchema()


print("Reviews count:", reviews_raw_df.count())
print("Metadata count:", metadata_raw_df.count())




root
 |-- asin: string (nullable = true)
 |-- image: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- overall: double (nullable = true)
 |-- reviewText: string (nullable = true)
 |-- reviewTime: string (nullable = true)
 |-- reviewerID: string (nullable = true)
 |-- reviewerName: string (nullable = true)
 |-- style: struct (nullable = true)
 |    |-- Color:: string (nullable = true)
 |    |-- Design:: string (nullable = true)
 |    |-- Flavor:: string (nullable = true)
 |    |-- Format:: string (nullable = true)
 |    |-- Item Package Quantity:: string (nullable = true)
 |    |-- Length:: string (nullable = true)
 |    |-- Package Quantity:: string (nullable = true)
 |    |-- Package Type:: string (nullable = true)
 |    |-- Pattern:: string (nullable = true)
 |    |-- Scent:: string (nullable = true)
 |    |-- Size Name:: string (nullable = true)
 |    |-- Size:: string (nullable = true)
 |    |-- Style Name:: string (nullable = true)
 |    |-- Style:: stri

In [5]:
reviews_raw_df.show(1, truncate=False)

metadata_raw_df.show(1, truncate=False)


+----------+-----+-------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [6]:
from pyspark.sql.functions import col

reviews_selected_df = reviews_raw_df.select(
    col("reviewerID"),
    col("verified"),
    col("asin"),
    col("overall"),
    col("reviewText"),
    col("summary"),
    col("unixReviewTime")
)

reviews_selected_df.printSchema()
reviews_selected_df.show(5, truncate=False)


root
 |-- reviewerID: string (nullable = true)
 |-- verified: boolean (nullable = true)
 |-- asin: string (nullable = true)
 |-- overall: double (nullable = true)
 |-- reviewText: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- unixReviewTime: long (nullable = true)

+--------------+--------+----------+-------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [7]:
metadata_selected_df = metadata_raw_df.select(
    col("asin"),
    col("title"),
    col("brand"),
    col("category")
)

metadata_selected_df.printSchema()
metadata_selected_df.show(5, truncate=False)


root
 |-- asin: string (nullable = true)
 |-- title: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- category: array (nullable = true)
 |    |-- element: string (containsNull = true)

+----------+---------------------------------------------------------------------------------------------------------------------------------+--------------+------------------------------------------------------------------------------+
|asin      |title                                                                                                                            |brand         |category                                                                      |
+----------+---------------------------------------------------------------------------------------------------------------------------------+--------------+------------------------------------------------------------------------------+
|7301113188|Tupperware Freezer Square Round Container Set of 6                        

In [8]:
print("Reviews columns:", reviews_selected_df.columns)
print("Metadata columns:", metadata_selected_df.columns)


Reviews columns: ['reviewerID', 'verified', 'asin', 'overall', 'reviewText', 'summary', 'unixReviewTime']
Metadata columns: ['asin', 'title', 'brand', 'category']


In [9]:
print("Reviews count:", reviews_selected_df.count())
print("Metadata count:", metadata_selected_df.count())


Reviews count: 602777
Metadata count: 30445


In [10]:
reviews_selected_df.printSchema()
metadata_selected_df.printSchema()


root
 |-- reviewerID: string (nullable = true)
 |-- verified: boolean (nullable = true)
 |-- asin: string (nullable = true)
 |-- overall: double (nullable = true)
 |-- reviewText: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- unixReviewTime: long (nullable = true)

root
 |-- asin: string (nullable = true)
 |-- title: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- category: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [11]:
from pyspark.sql.functions import col, trim, size
from pyspark.sql.types import StringType, ArrayType


for c in reviews_selected_df.columns:
    null_count = reviews_selected_df.filter(col(c).isNull()).count()
    print(f"Reviews | {c} | NULL count = {null_count}")



Reviews | reviewerID | NULL count = 0
Reviews | verified | NULL count = 0
Reviews | asin | NULL count = 0
Reviews | overall | NULL count = 0
Reviews | reviewText | NULL count = 324
Reviews | summary | NULL count = 128
Reviews | unixReviewTime | NULL count = 0


In [12]:
for c in metadata_selected_df.columns:
    null_count = metadata_selected_df.filter(col(c).isNull()).count()
    print(f"Metadata | {c} | NULL count = {null_count}")


Metadata | asin | NULL count = 0
Metadata | title | NULL count = 0
Metadata | brand | NULL count = 0
Metadata | category | NULL count = 0


In [13]:
for field in reviews_selected_df.schema.fields:
    if isinstance(field.dataType, StringType):
        empty_count = reviews_selected_df.filter(
            trim(col(field.name)) == ""
        ).count()
        print(f"Reviews | {field.name} | Empty string count = {empty_count}")



Reviews | reviewerID | Empty string count = 0
Reviews | asin | Empty string count = 0
Reviews | reviewText | Empty string count = 0
Reviews | summary | Empty string count = 2


In [14]:
for field in metadata_selected_df.schema.fields:
    if isinstance(field.dataType, StringType):
        empty_count = metadata_selected_df.filter(
            trim(col(field.name)) == ""
        ).count()
        print(f"Metadata | {field.name} | Empty string count = {empty_count}")



Metadata | asin | Empty string count = 0
Metadata | title | Empty string count = 0
Metadata | brand | Empty string count = 584


In [15]:
for field in metadata_selected_df.schema.fields:
    if isinstance(field.dataType, ArrayType):
        empty_array_count = metadata_selected_df.filter(
            size(col(field.name)) == 0
        ).count()
        print(f"Metadata | {field.name} | Empty array count = {empty_array_count}")



Metadata | category | Empty array count = 806


In [16]:
reviews_selected_df.select("overall") \
    .groupBy("overall") \
    .count() \
    .orderBy("overall") \
    .show()



+-------+------+
|overall| count|
+-------+------+
|    1.0| 59627|
|    2.0| 20734|
|    3.0| 30652|
|    4.0| 75476|
|    5.0|416288|
+-------+------+



In [17]:
REVIEWS_CLEANED_PATH = "amazon-review-analytics/big-data-pipeline/data/cleaned/reviews".format(
    spark.sparkContext.sparkUser()
)

reviews_selected_df.write \
    .mode("overwrite") \
    .parquet(REVIEWS_CLEANED_PATH)



METADATA_CLEANED_PATH = "amazon-review-analytics/big-data-pipeline/data/cleaned/metadata".format(
    spark.sparkContext.sparkUser()
)

metadata_selected_df.write \
    .mode("overwrite") \
    .parquet(METADATA_CLEANED_PATH)



In [18]:
cleaned_reviews_df = spark.read.parquet(REVIEWS_CLEANED_PATH)
cleaned_metadata_df = spark.read.parquet(METADATA_CLEANED_PATH)

print("Cleaned Reviews count:", cleaned_reviews_df.count())
print("Cleaned Metadata count:", cleaned_metadata_df.count())

cleaned_reviews_df.printSchema()
cleaned_metadata_df.printSchema()


Cleaned Reviews count: 602777
Cleaned Metadata count: 30445
root
 |-- reviewerID: string (nullable = true)
 |-- verified: boolean (nullable = true)
 |-- asin: string (nullable = true)
 |-- overall: double (nullable = true)
 |-- reviewText: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- unixReviewTime: long (nullable = true)

root
 |-- asin: string (nullable = true)
 |-- title: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- category: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [19]:

# HDFS paths for cleaned data (Silver layer)
REVIEWS_CLEANED_PATH = "amazon-review-analytics/big-data-pipeline/data/cleaned/reviews".format(
    spark.sparkContext.sparkUser()
)

METADATA_CLEANED_PATH = "amazon-review-analytics/big-data-pipeline/data/cleaned/metadata".format(
    spark.sparkContext.sparkUser()
)

# Read cleaned reviews data from HDFS
cleaned_reviews_df = spark.read.parquet(REVIEWS_CLEANED_PATH)

# Read cleaned metadata data from HDFS
cleaned_metadata_df = spark.read.parquet(METADATA_CLEANED_PATH)



In [20]:
# Check schema of reviews to confirm 'asin' exists and is flat
cleaned_reviews_df.printSchema()

# Check schema of metadata to confirm 'asin' exists and is flat
cleaned_metadata_df.printSchema()


root
 |-- reviewerID: string (nullable = true)
 |-- verified: boolean (nullable = true)
 |-- asin: string (nullable = true)
 |-- overall: double (nullable = true)
 |-- reviewText: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- unixReviewTime: long (nullable = true)

root
 |-- asin: string (nullable = true)
 |-- title: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- category: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [21]:
from pyspark.sql.functions import col

# Perform INNER JOIN between reviews and metadata on 'asin'
# INNER JOIN ensures only records present in BOTH datasets are retained
joined_df = cleaned_reviews_df.join(
    cleaned_metadata_df,
    on="asin",
    how="inner"
)


In [22]:
print("Joined dataset count:", joined_df.count())


Joined dataset count: 615747


In [23]:
# Display a few joined records for manual validation
joined_df.show(1, truncate=False)


+----------+--------------+--------+-------+----------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------+--------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+------------------------------------------------------------------------------+
|asin      |reviewerID    |verified|overall|reviewText                                                                                                                                                |summary                              |unixReviewTime|title                                                                                                                                                             |brand        |category                                                      

In [24]:
# Inspect schema to confirm all expected columns are present
joined_df.printSchema()



root
 |-- asin: string (nullable = true)
 |-- reviewerID: string (nullable = true)
 |-- verified: boolean (nullable = true)
 |-- overall: double (nullable = true)
 |-- reviewText: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- unixReviewTime: long (nullable = true)
 |-- title: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- category: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [25]:
from pyspark.sql.functions import col, trim, size

# Apply mandatory column enforcement
curated_df = joined_df.filter(
    # asin must exist and not be empty
    col("asin").isNotNull() &
    (trim(col("asin")) != "") &

    # reviewText must exist and not be empty
    col("reviewText").isNotNull() &
    (trim(col("reviewText")) != "") &
    
    # summary must exist and not be empty (NEW RULE)
    col("summary").isNotNull() &
    (trim(col("summary")) != "") &

    # overall rating must exist
    col("overall").isNotNull() &

    # brand must exist and not be empty
    col("brand").isNotNull() &
    (trim(col("brand")) != "") &

    # category array must exist and not be empty
    col("category").isNotNull() &
    (size(col("category")) > 0)&

    # ONLY verified customers
    (col("verified") == True)
)


In [26]:
print("Before enforcement count:", joined_df.count())
print("After enforcement count:", curated_df.count())


Before enforcement count: 615747
After enforcement count: 564410


In [27]:
# Check NULL values in curated dataset
for c in curated_df.columns:
    null_count = curated_df.filter(col(c).isNull()).count()
    print(f"Curated | {c} | NULL count = {null_count}")

    

Curated | asin | NULL count = 0
Curated | reviewerID | NULL count = 0
Curated | verified | NULL count = 0
Curated | overall | NULL count = 0
Curated | reviewText | NULL count = 0
Curated | summary | NULL count = 0
Curated | unixReviewTime | NULL count = 0
Curated | title | NULL count = 0
Curated | brand | NULL count = 0
Curated | category | NULL count = 0


In [28]:
from pyspark.sql.types import StringType

# Check empty strings in curated dataset
for field in curated_df.schema.fields:
    if isinstance(field.dataType, StringType):
        empty_count = curated_df.filter(
            trim(col(field.name)) == ""
        ).count()
        print(f"Curated | {field.name} | Empty string count = {empty_count}")


Curated | asin | Empty string count = 0
Curated | reviewerID | Empty string count = 0
Curated | reviewText | Empty string count = 0
Curated | summary | Empty string count = 0
Curated | title | Empty string count = 0
Curated | brand | Empty string count = 0


In [29]:
from pyspark.sql.types import ArrayType

# Check empty arrays in curated dataset
for field in curated_df.schema.fields:
    if isinstance(field.dataType, ArrayType):
        empty_array_count = curated_df.filter(
            size(col(field.name)) == 0
        ).count()
        print(f"Curated | {field.name} | Empty array count = {empty_array_count}")


Curated | category | Empty array count = 0


In [30]:
# HDFS path for final curated (Gold) dataset
CURATED_OUTPUT_PATH = "amazon-review-analytics/big-data-pipeline/data/processed/appliance_reviews_curated".format(
    spark.sparkContext.sparkUser()
)


# Write curated dataset in Parquet format to HDFS
# Overwrite is safe because Gold data is reproducible from upstream layers
curated_df.write \
    .mode("overwrite") \
    .parquet(CURATED_OUTPUT_PATH)



In [None]:
# Read back curated data from HDFS to validate
gold_df = spark.read.parquet(CURATED_OUTPUT_PATH)

# Validate row count
print("Gold dataset count:", gold_df.count())

# Validate schema
gold_df.printSchema()

# Preview data
gold_df.show(1

In [31]:
, truncate=False)

Gold dataset count: 564410
root
 |-- asin: string (nullable = true)
 |-- reviewerID: string (nullable = true)
 |-- verified: boolean (nullable = true)
 |-- overall: double (nullable = true)
 |-- reviewText: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- unixReviewTime: long (nullable = true)
 |-- title: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- category: array (nullable = true)
 |    |-- element: string (containsNull = true)

+----------+--------------+--------+-------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [32]:
from pyspark.sql.functions import col, trim, size

# Ensure mandatory columns have no NULL / empty values
assert gold_df.filter(
    col("asin").isNull() |
    (trim(col("asin")) == "") |
    col("reviewText").isNull() |
    (trim(col("reviewText")) == "") |
    col("summary").isNull() |
    (trim(col("summary")) == "") |
    col("overall").isNull() |
    col("brand").isNull() |
    (trim(col("brand")) == "") |
    col("category").isNull() |
    (size(col("category")) == 0)
).count() == 0
