In [17]:
import findspark
findspark.init()

from pyspark.sql import SparkSession

In [18]:
# Start Spark session
spark = SparkSession.builder \
    .appName("AmazonReviewsIngestion") \
    .getOrCreate()

In [19]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# Schema for the dataset, specifying the data types for cols
schema = StructType([
    StructField("product_id", StringType(), True),
    StructField("product_title", StringType(), True),
    StructField("star_rating", StringType(), True),
    StructField("helpful_votes", StringType(), True),
    StructField("total_votes", StringType(), True),
    StructField("review_headline", StringType(), True),
    StructField("review_body", StringType(), True),
    StructField("review_date", StringType(), True),
])

In [20]:
# Local path to the dataset
local_file_path = "clean_books_10k.csv"

reviews_df = spark.read.csv(
    local_file_path,
    sep=",",
    header=True,       
    schema=schema,     # Fixed schema
    multiLine=True,
    quote='"',
    escape='"'
)

print("Data Sample:")
reviews_df.show(5)

print("Schema:")
reviews_df.printSchema()

Data Sample:
+----------+--------------------+-----------+-------------+-----------+--------------------+--------------------+-----------+
|product_id|       product_title|star_rating|helpful_votes|total_votes|     review_headline|         review_body|review_date|
+----------+--------------------+-----------+-------------+-----------+--------------------+--------------------+-----------+
|0312977379|    Beware the Night|        4.0|         61.0|       79.0|A book that actua...|Unlike many books...| 2005-10-13|
|1420832158|JEET KUNE DO: THE...|        5.0|          1.0|        4.0|Something  For Ev...|This book is the ...| 2005-10-13|
|0312977379|    Beware the Night|        5.0|         12.0|       18.0|    Beware the Night|When I started th...| 2005-10-13|
|0312336853|Shooter: The Auto...|        5.0|          1.0|        4.0|Hard to put this ...|This book has som...| 2005-10-13|
|0756607574|             Panties|        4.0|          5.0|       12.0|         A Nice Read|This book is 

In [21]:
# HDFS output directory
hdfs_path = "hdfs://localhost:54310/user/ubuntu/books_dataset/"

# Uses coalesce to ensure the data is written as a single file
reviews_df.coalesce(1).write \
    .option("header", True) \
    .mode("overwrite") \
    .csv(hdfs_path)

print(f"Data written to HDFS at {hdfs_path}")

Data written to HDFS at hdfs://localhost:54310/user/ubuntu/books_dataset/


In [22]:
streaming_input_path = "hdfs://localhost:54310/user/ubuntu/streaming_input/"

# Create streaming DF to read CSV file
streaming_df = spark.readStream \
    .option("sep", ",") \
    .option("header", False) \
    .option("multiLine", True) \
    .option("quote", "\"") \
    .option("escape", "\"") \
    .schema(schema) \
    .csv(streaming_input_path)

streaming_df = streaming_df.filter(streaming_df["product_id"] != "product_id")

# Example query: count reviews by rating as files arrive
query = streaming_df.groupBy("star_rating").count()

query_writer = query.writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

print("Directory-based streaming started. Copy CSV chunks into 'streaming_folder/' to see updates.")

Directory-based streaming started. Copy CSV chunks into 'streaming_folder/' to see updates.


25/09/25 22:31:42 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-2ac2002b-a88d-4345-a5ae-7ccdd0e7285c. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/09/25 22:31:42 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+-----------+-----+
|star_rating|count|
+-----------+-----+
|        1.0|  117|
|        5.0|  560|
|        4.0|  184|
|        2.0|   50|
|        3.0|   89|
+-----------+-----+



                                                                                

-------------------------------------------
Batch: 1
-------------------------------------------
+-----------+-----+
|star_rating|count|
+-----------+-----+
|        1.0|  219|
|        5.0| 1116|
|        4.0|  380|
|        2.0|  110|
|        3.0|  175|
+-----------+-----+



                                                                                

-------------------------------------------
Batch: 2
-------------------------------------------
+-----------+-----+
|star_rating|count|
+-----------+-----+
|        1.0|  318|
|        5.0| 1702|
|        4.0|  559|
|        2.0|  175|
|        3.0|  246|
+-----------+-----+



                                                                                

-------------------------------------------
Batch: 3
-------------------------------------------
+-----------+-----+
|star_rating|count|
+-----------+-----+
|        1.0|  421|
|        5.0| 2286|
|        4.0|  730|
|        2.0|  227|
|        3.0|  336|
+-----------+-----+

-------------------------------------------
Batch: 4
-------------------------------------------
+-----------+-----+
|star_rating|count|
+-----------+-----+
|        1.0|  510|
|        5.0| 2839|
|        4.0|  916|
|        2.0|  305|
|        3.0|  430|
+-----------+-----+



                                                                                

-------------------------------------------
Batch: 5
-------------------------------------------
+-----------+-----+
|star_rating|count|
+-----------+-----+
|        1.0|  569|
|        5.0| 3446|
|        4.0| 1123|
|        2.0|  357|
|        3.0|  505|
+-----------+-----+



                                                                                

-------------------------------------------
Batch: 6
-------------------------------------------
+-----------+-----+
|star_rating|count|
+-----------+-----+
|        1.0|  655|
|        5.0| 3993|
|        4.0| 1324|
|        2.0|  429|
|        3.0|  599|
+-----------+-----+



                                                                                

-------------------------------------------
Batch: 7
-------------------------------------------
+-----------+-----+
|star_rating|count|
+-----------+-----+
|        1.0|  728|
|        5.0| 4578|
|        4.0| 1535|
|        2.0|  478|
|        3.0|  681|
+-----------+-----+



                                                                                

-------------------------------------------
Batch: 8
-------------------------------------------
+-----------+-----+
|star_rating|count|
+-----------+-----+
|        1.0|  801|
|        5.0| 5157|
|        4.0| 1741|
|        2.0|  536|
|        3.0|  765|
+-----------+-----+



                                                                                

-------------------------------------------
Batch: 9
-------------------------------------------
+-----------+-----+
|star_rating|count|
+-----------+-----+
|        1.0|  884|
|        5.0| 5818|
|        4.0| 1862|
|        2.0|  590|
|        3.0|  846|
+-----------+-----+

