In [1]:
import os
import sys

os.environ["SPARK_HOME"] = "/home/talentum/spark"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
# In below two lines, use /usr/bin/python2.7 if you want to use Python 2
os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3.6" 
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.7-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

# NOTE: Whichever package you want mention here.
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0 pyspark-shell' 
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-avro_2.11:2.4.0 pyspark-shell'
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0,org.apache.spark:spark-avro_2.11:2.4.3 pyspark-shell'
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0,org.apache.spark:spark-avro_2.11:2.4.0 pyspark-shell'

In [2]:
from pyspark.sql import SparkSession


spark = SparkSession.builder \
    .appName("AmazonAppliancesIngestionNotebook") \
    .getOrCreate()


In [3]:
reviews_df = spark.read.json("file:///home/talentum/projects/amazon-review-analytics/big-data-pipeline/data/raw/Appliances.json")
reviews_df.printSchema()

root
 |-- asin: string (nullable = true)
 |-- image: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- overall: double (nullable = true)
 |-- reviewText: string (nullable = true)
 |-- reviewTime: string (nullable = true)
 |-- reviewerID: string (nullable = true)
 |-- reviewerName: string (nullable = true)
 |-- style: struct (nullable = true)
 |    |-- Color:: string (nullable = true)
 |    |-- Design:: string (nullable = true)
 |    |-- Flavor:: string (nullable = true)
 |    |-- Format:: string (nullable = true)
 |    |-- Item Package Quantity:: string (nullable = true)
 |    |-- Length:: string (nullable = true)
 |    |-- Package Quantity:: string (nullable = true)
 |    |-- Package Type:: string (nullable = true)
 |    |-- Pattern:: string (nullable = true)
 |    |-- Scent:: string (nullable = true)
 |    |-- Size Name:: string (nullable = true)
 |    |-- Size:: string (nullable = true)
 |    |-- Style Name:: string (nullable = true)
 |    |-- Style:: stri

In [4]:
reviews_df_clean = reviews_df.select(
    "reviewerID",
    "verified",
    "asin",
    "overall",
    "reviewText",
    "summary",
    "unixReviewTime"
)

reviews_df_clean.show(5)
reviews_df_clean.describe().show()

+--------------+--------+----------+-------+--------------------+--------------------+--------------+
|    reviewerID|verified|      asin|overall|          reviewText|             summary|unixReviewTime|
+--------------+--------+----------+-------+--------------------+--------------------+--------------+
|A3NHUQ33CFH3VM|   false|1118461304|    5.0|Not one thing in ...|Clear on what lea...|    1385510400|
|A3SK6VNBQDNBJE|   false|1118461304|    5.0|I have enjoyed Dr...|Becoming more inn...|    1383264000|
|A3SOFHUR27FO3K|   false|1118461304|    5.0|Alan Gregerman be...|The World from Di...|    1381363200|
|A1HOG1PYCAE157|   false|1118461304|    5.0|Alan Gregerman is...|Strangers are You...|    1381276800|
|A26JGAM6GZMM4V|   false|1118461304|    5.0|As I began to rea...|How and why it is...|    1378512000|
+--------------+--------+----------+-------+--------------------+--------------------+--------------+
only showing top 5 rows

+-------+--------------------+-------------------+-------

In [5]:
from pyspark.sql.functions import col, when, count, size

def get_empty_condition(column_name, data_type):
    # If it's a standard String column
    if data_type == 'string':
        return col(column_name) == ""
    # If it's an Array (like your 'category' column)
    elif 'array' in data_type:
        return size(col(column_name)) == 0
    # For other types (int, float, etc.), they can't be "empty", only null
    else:
        return col(column_name).isNull()

# Apply the logic dynamically to meta_df_safe
reviews_df_clean.select([
    count(when(get_empty_condition(c, t), c)).alias(c) 
    for c, t in reviews_df_clean.dtypes
]).show()

reviews_df_clean.count()


+----------+--------+----+-------+----------+-------+--------------+
|reviewerID|verified|asin|overall|reviewText|summary|unixReviewTime|
+----------+--------+----+-------+----------+-------+--------------+
|         0|       0|   0|      0|         0|      0|             0|
+----------+--------+----+-------+----------+-------+--------------+



602777

In [6]:
df_filtered = reviews_df_clean.filter(col("verified") == "true")
df_filtered.count()

563870

In [7]:

reviews_df_clean.write.mode("overwrite") \
    .parquet("file:///home/talentum/projects/amazon-review-analytics/big-data-pipeline/data/cleaned/reviews_bronze")



In [8]:
#Metadata Handling

#Read Metadata
meta_df = spark.read.json("file:///home/talentum/projects/amazon-review-analytics/big-data-pipeline/data/raw/meta_Appliances.json")

meta_df.printSchema()

root
 |-- also_buy: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- also_view: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- asin: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- category: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- date: string (nullable = true)
 |-- description: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- details: struct (nullable = true)
 |    |-- 
    Item Weight: 
    : string (nullable = true)
 |    |-- 
    Product Dimensions: 
    : string (nullable = true)
 |    |-- ASIN:: string (nullable = true)
 |    |-- ASIN: : string (nullable = true)
 |    |-- Batteries: string (nullable = true)
 |    |-- Domestic Shipping: : string (nullable = true)
 |    |-- International Shipping: : string (nullable = true)
 |    |-- Item model number:: string (nullable = true)
 |    |-- Publisher:: string (nullable = true)
 |    |-- Shipping

In [9]:
#Select SAFE Columns Only

meta_df_safe = meta_df.select(
    "asin",
    "title",
    "brand",
    "price",
    "category",
    "main_cat"
)
meta_df_safe.show(3)

+----------+--------------------+--------------+-------+--------------------+-----------+
|      asin|               title|         brand|  price|            category|   main_cat|
+----------+--------------------+--------------+-------+--------------------+-----------+
|7301113188|Tupperware Freeze...|    Tupperware|       |[Appliances, Refr...| Appliances|
|7861850250|2 X Tupperware Pu...|    Tupperware|  $3.62|[Appliances, Refr...| Appliances|
|8792559360|The Cigar - Momen...|The Cigar Book|$150.26|[Appliances, Part...|Amazon Home|
+----------+--------------------+--------------+-------+--------------------+-----------+
only showing top 3 rows



In [10]:
from pyspark.sql.functions import col, when, count, size

def get_empty_condition(column_name, data_type):
    # If it's a standard String column
    if data_type == 'string':
        return col(column_name) == ""
    # If it's an Array (like your 'category' column)
    elif 'array' in data_type:
        return size(col(column_name)) == 0
    # For other types (int, float, etc.), they can't be "empty", only null
    else:
        return col(column_name).isNull()

# Apply the logic dynamically to meta_df_safe
meta_df_safe.select([
    count(when(get_empty_condition(c, t), c)).alias(c) 
    for c, t in meta_df_safe.dtypes
]).show()




+----+-----+-----+-----+--------+--------+
|asin|title|brand|price|category|main_cat|
+----+-----+-----+-----+--------+--------+
|   0|    0|  584|10292|     806|      91|
+----+-----+-----+-----+--------+--------+



In [11]:
meta_df_filtered=meta_df_safe.drop("price","main_cat")

In [12]:
def get_empty_condition(column_name, data_type):
    # If it's a standard String column
    if data_type == 'string':
        return col(column_name) == ""
    # If it's an Array (like your 'category' column)
    elif 'array' in data_type:
        return size(col(column_name)) == 0
    # For other types (int, float, etc.), they can't be "empty", only null
    else:
        return col(column_name).isNull()

# Apply the logic dynamically to meta_df_safe
meta_df_filtered.select([
    count(when(get_empty_condition(c, t), c)).alias(c) 
    for c, t in meta_df_filtered.dtypes
]).show()

+----+-----+-----+--------+
|asin|title|brand|category|
+----+-----+-----+--------+
|   0|    0|  584|     806|
+----+-----+-----+--------+



In [13]:
from pyspark.sql.functions import col, trim, when

meta_df_norm = meta_df_filtered.withColumn(
    "brand",
    when(
        col("brand").isNull() | (trim(col("brand")) == ""),
        None
    ).otherwise(col("brand"))
)

from pyspark.sql.functions import first

brand_by_title = meta_df_norm \
    .filter(col("brand").isNotNull()) \
    .groupBy("title") \
    .agg(first("brand", ignorenulls=True).alias("brand_from_title"))




In [14]:
meta_df_brand_filled = meta_df_norm \
    .join(brand_by_title, on="title", how="left") \
    .withColumn(
        "brand",
        when(col("brand").isNull(), col("brand_from_title"))
        .otherwise(col("brand"))
    ) \
    .drop("brand_from_title")


In [16]:
def get_empty_condition(column_name, data_type):
    # If it's a standard String column
    if data_type == 'string':
        return col(column_name) == ""
    # If it's an Array (like your 'category' column)
    elif 'array' in data_type:
        return size(col(column_name)) == 0
    # For other types (int, float, etc.), they can't be "empty", only null
    else:
        return col(column_name).isNull()

# Apply the logic dynamically to meta_df_safe
meta_df_brand_filled.select([
    count(when(get_empty_condition(c, t), c)).alias(c) 
    for c, t in meta_df_brand_filled.dtypes
]).show()

+-----+----+-----+--------+
|title|asin|brand|category|
+-----+----+-----+--------+
|    0|   0|    0|     806|
+-----+----+-----+--------+



In [17]:
from pyspark.sql.functions import size

meta_df_norm2 = meta_df_brand_filled.withColumn(
    "category",
    when(
        col("category").isNull() | (size(col("category")) == 0),
        None
    ).otherwise(col("category"))
)


In [18]:
category_by_title = meta_df_norm2 \
    .filter(col("category").isNotNull()) \
    .groupBy("title") \
    .agg(first("category", ignorenulls=True).alias("category_from_title"))



meta_df_final_clean = meta_df_norm2 \
    .join(category_by_title, on="title", how="left") \
    .withColumn(
        "category",
        when(col("category").isNull(), col("category_from_title"))
        .otherwise(col("category"))
    ) \
    .drop("category_from_title")



In [19]:
def get_empty_condition(column_name, data_type):
    # If it's a standard String column
    if data_type == 'string':
        return col(column_name) == ""
    # If it's an Array (like your 'category' column)
    elif 'array' in data_type:
        return size(col(column_name)) == 0
    # For other types (int, float, etc.), they can't be "empty", only null
    else:
        return col(column_name).isNull()

# Apply the logic dynamically to meta_df_safe
meta_df_final_clean.select([
    count(when(get_empty_condition(c, t), c)).alias(c) 
    for c, t in meta_df_final_clean.dtypes
]).show()

+-----+----+-----+--------+
|title|asin|brand|category|
+-----+----+-----+--------+
|    0|   0|    0|       0|
+-----+----+-----+--------+



In [None]:
#Write Metadata Bronze Parquet

meta_df_safe.write.mode("overwrite") \
    .parquet("file:///home/talentum/projects/amazon-review-analytics/big-data-pipeline/data/cleaned/meta_bronze")
