In [None]:
# BookBridge ETL for Dataproc

# This script is designed to run on a GCP Dataproc cluster as a Spark job.
# It does two main things:

# 1. Review ETL:
#    - Load Amazon reviews JSONL from GCS
#    - Filter verified purchases
#    - Select top N popular books by asin
#    - Build user purchase sequences
#    - Export item2vec training corpus as gzipped text (one sequence per line)

# 2. Metadata ETL:
#    - Load book metadata JSONL from GCS
#    - Select a slim set of fields useful for recommendation display
#    - Join with top books
#    - Export as JSON (for API / Cloud Run to consume)

# Usage (Dataproc / spark-submit):

#   spark-submit \
#     --deploy-mode cluster \
#     BookBridgeETL.py \
#     --bucket book_bridge \
#     --top-k 100000

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("BookBridge_ETL") \
    .getOrCreate()

print(f"Spark Version: {spark.version}")
print(f"App Name: {spark.sparkContext.appName}")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/26 20:05:36 INFO SparkEnv: Registering MapOutputTracker
25/11/26 20:05:36 INFO SparkEnv: Registering BlockManagerMaster
25/11/26 20:05:36 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
25/11/26 20:05:37 INFO SparkEnv: Registering OutputCommitCoordinator


In [6]:
BUCKET_NAME = "book_bridge"

# Define paths to books files in GCS
reviews_path = f"gs://{BUCKET_NAME}/Books.jsonl"
meta_path = f"gs://{BUCKET_NAME}/meta_Books.jsonl"

print(f"Loading data from: {reviews_path}")

Loading data from: gs://book_bridge/Books.jsonl


In [8]:
spark.conf.set("spark.sql.caseSensitive", "true")

# books' metadata
df_meta = spark.read.json(meta_path)
df_meta.printSchema()

                                                                                

root
 |-- author: struct (nullable = true)
 |    |-- about: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- avatar: string (nullable = true)
 |    |-- name: string (nullable = true)
 |-- average_rating: double (nullable = true)
 |-- bought_together: string (nullable = true)
 |-- categories: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- description: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- details: struct (nullable = true)
 |    |-- 11 54: string (nullable = true)
 |    |-- 15 69: string (nullable = true)
 |    |-- 18 86: string (nullable = true)
 |    |-- 22 49: string (nullable = true)
 |    |-- 3 5 and 5 25 disks: string (nullable = true)
 |    |-- 3 5 disk: string (nullable = true)
 |    |-- Accessory: string (nullable = true)
 |    |-- Additional product features: string (nullable = true)
 |    |-- Address Book: string (nullable = true)
 |    |-- Age Range (Description): string

25/11/26 20:10:28 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


In [9]:
# books' review
df_reviews = spark.read.json(reviews_path)
df_reviews.printSchema()



root
 |-- asin: string (nullable = true)
 |-- helpful_vote: long (nullable = true)
 |-- images: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- attachment_type: string (nullable = true)
 |    |    |-- large_image_url: string (nullable = true)
 |    |    |-- medium_image_url: string (nullable = true)
 |    |    |-- small_image_url: string (nullable = true)
 |-- parent_asin: string (nullable = true)
 |-- rating: double (nullable = true)
 |-- text: string (nullable = true)
 |-- timestamp: long (nullable = true)
 |-- title: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- verified_purchase: boolean (nullable = true)





In [18]:
print("\n--- Sample Review Data ---")
df_reviews.show(5, truncate=5)


--- Sample Review Data ---
+-----+------------+------+-----------+------+-----+---------+-----+-------+-----------------+
| asin|helpful_vote|images|parent_asin|rating| text|timestamp|title|user_id|verified_purchase|
+-----+------------+------+-----------+------+-----+---------+-----+-------+-----------------+
|B0...|           0| [{...|      B0...|   1.0|It...|    16...|No...|  AF...|             true|
|05...|           1|    []|      05...|   5.0|Up...|    16...|Up...|  AF...|             true|
|17...|           0|    []|      17...|   5.0|I ...|    16...|Ex...|  AF...|             true|
|05...|           0|    []|      05...|   5.0|Up...|    16...|Up...|  AF...|            false|
|08...|           0| [{...|      08...|   5.0|I ...|    16...|Be...|  AF...|             true|
+-----+------------+------+-----------+------+-----+---------+-----+-------+-----------------+
only showing top 5 rows



In [19]:
from pyspark.sql import functions as F

In [20]:
# only keep top 100,000 Interaction Books with Verified Reviews
from pyspark.sql import functions as F

print("--- Calculating Top 100,000 Verified Books ---")

# 1. Filter for Verified Purchases Only
# Based on your image, 'verified_purchase' is a boolean (true/false) column.
verified_df = df_reviews.filter(F.col("verified_purchase") == True)

# 2. Group by 'asin' (Book ID) and Count
book_counts = verified_df.groupBy("asin").count()

# 3. Order by count (descending) and take top 100,000
# We only select 'asin' because we don't need the count column for the Join later.
top_books_df = book_counts.orderBy(F.col("count").desc()) \
                          .limit(100000) \
                          .select("asin")

# 4. CACHE THIS DATAFRAME
# This is critical. We will use this DF to filter BOTH the reviews and the metadata.
# Without caching, Spark would re-calculate the top 100k twice.
top_books_df.cache()

# Trigger an action to force the computation and cache it now
count = top_books_df.count()
print(f"Successfully cached {count} top books.")

top_books_df.show(5)

--- Calculating Top 100,000 Verified Books ---


                                                                                

Successfully cached 100000 top books.
+----------+
|      asin|
+----------+
|B00L9B7IKE|
|B00JO8PEN2|
|B006LSZECO|
|B00DPM7TIG|
|B00CNQ7HAU|
+----------+
only showing top 5 rows



In [21]:
from pyspark.sql.functions import broadcast

print("--- Filtering Reviews and Generating User Sequences ---")

# 1. Inner Join with Top Books
# We use 'broadcast(top_books_df)' to force a fast map-side join.
# This drops any review where the book is NOT in the top 100,000.
filtered_reviews = verified_df.join(broadcast(top_books_df), "asin", "inner")

# 2. Group by User and Create Ordered Sequence
# We sort by timestamp first so the sequence represents the user's actual journey.
# Note: We collect the list of ASINs into a column named 'item_sequence'
user_sequences_df = filtered_reviews.orderBy("timestamp") \
    .groupBy("user_id") \
    .agg(F.collect_list("asin").alias("item_sequence"))

# 3. Filter out users with too short history
# Item2Vec (Word2Vec) context window is usually 5. 
# If a user only bought 1 or 2 items, they don't provide much context.
# Let's keep users with at least 3 items.
user_sequences_df = user_sequences_df.filter(F.size(F.col("item_sequence")) >= 3)

# 4. Convert List to Space-Separated String (For Gensim)
# ["B001", "B002"] -> "B001 B002"
final_training_data = user_sequences_df.select(
    F.concat_ws(" ", "item_sequence").alias("sentence")
)

# --- Verification ---
print(f"Number of users with valid sequences: {final_training_data.count()}")

print("\n--- Sample Sequences (Ready for Gensim) ---")
final_training_data.show(5, truncate=False)

--- Filtering Reviews and Generating User Sequences ---


                                                                                

Number of users with valid sequences: 865397

--- Sample Sequences (Ready for Gensim) ---




+------------------------------------------------------------------------------------------------------------------------+
|sentence                                                                                                                |
+------------------------------------------------------------------------------------------------------------------------+
|B00AVMTF8U B00IJXAM80 0762782811 0425158543                                                                             |
|B07QYY1NN5 B07L4PL653 B07R3QYGHY                                                                                        |
|039958014X 0399226907 0545688868 1523501227 152350207X                                                                  |
|B00G3L6KQI 1538747251 1538747278 1455530441 0062311158 B00FJ3AC10 1451635834 0735213186 1416586296 0316055433 1944229450|
|1476812063 1423477758 0545174805                                                                                        |
+---------------

25/11/26 20:34:37 ERROR TransportClient: Failed to send RPC RPC 7472540338620573548 to /10.128.0.4:44462: io.netty.channel.StacklessClosedChannelException
io.netty.channel.StacklessClosedChannelException: null
	at io.netty.channel.AbstractChannel$AbstractUnsafe.write(Object, ChannelPromise)(Unknown Source) ~[netty-transport-4.1.100.Final.jar:4.1.100.Final]
25/11/26 20:34:37 WARN BlockManagerMasterEndpoint: Error trying to remove broadcast 40 from block manager BlockManagerId(35, item2vec-etl-w-0.us-central1-f.c.bookbridge-477802.internal, 41241, None)
java.io.IOException: Failed to send RPC RPC 7472540338620573548 to /10.128.0.4:44462: io.netty.channel.StacklessClosedChannelException
	at org.apache.spark.network.client.TransportClient$RpcChannelListener.handleFailure(TransportClient.java:395) ~[spark-network-common_2.12-3.5.3.jar:3.5.3]
	at org.apache.spark.network.client.TransportClient$StdChannelListener.operationComplete(TransportClient.java:372) ~[spark-network-common_2.12-3.5.3.ja

In [22]:
# Output path for training data
OUTPUT_TRAIN_PATH = f"gs://{BUCKET_NAME}/item2vec_training_data"

print(f"--- Saving Training Data to {OUTPUT_TRAIN_PATH} ---")

# Save as compressed text
# Note: We do NOT use coalesce(1) here because the data is still huge and gensim can reading multi-file streamingly,
final_training_data.write.mode("overwrite").text(OUTPUT_TRAIN_PATH, compression="gzip")

print("Training data saved successfully.")

--- Saving Training Data to gs://book_bridge/item2vec_training_data ---


25/11/26 20:43:14 WARN YarnAllocator: Container from a bad node: container_1764180334063_0001_01_000039 on host: item2vec-etl-sw-nqrx.us-central1-f.c.bookbridge-477802.internal. Exit status: 143. Diagnostics: [2025-11-26 20:43:14.271]Container killed on request. Exit code is 143
[2025-11-26 20:43:14.272]Container exited with a non-zero exit code 143. 
[2025-11-26 20:43:14.277]Killed by external signal
.
25/11/26 20:43:14 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Requesting driver to remove executor 38 for reason Container from a bad node: container_1764180334063_0001_01_000039 on host: item2vec-etl-sw-nqrx.us-central1-f.c.bookbridge-477802.internal. Exit status: 143. Diagnostics: [2025-11-26 20:43:14.271]Container killed on request. Exit code is 143
[2025-11-26 20:43:14.272]Container exited with a non-zero exit code 143. 
[2025-11-26 20:43:14.277]Killed by external signal
.
25/11/26 20:43:14 WARN YarnAllocator: Container from a bad node: container_1764180334063_0001_01_000040 on

[Stage 50:>                                                       (0 + 11) / 13]25/11/26 20:47:44 WARN TaskSetManager: Lost task 3.0 in stage 50.0 (TID 2642) (item2vec-etl-w-1.us-central1-f.c.bookbridge-477802.internal executor 40): FetchFailed(BlockManagerId(39, item2vec-etl-sw-nqrx.us-central1-f.c.bookbridge-477802.internal, 7337, None), shuffleId=7, mapIndex=1, mapId=2486, reduceId=250, message=
org.apache.spark.shuffle.FetchFailedException
	at org.apache.spark.errors.SparkCoreErrors$.fetchFailedError(SparkCoreErrors.scala:437)
	at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:1408)
	at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:1103)
	at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:86)
	at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.sc

[Stage 50:>                                                        (0 + 9) / 13]25/11/26 20:47:44 WARN TaskSetManager: Lost task 5.0 in stage 50.0 (TID 2644) (item2vec-etl-w-0.us-central1-f.c.bookbridge-477802.internal executor 44): FetchFailed(BlockManagerId(38, item2vec-etl-sw-nqrx.us-central1-f.c.bookbridge-477802.internal, 7337, None), shuffleId=7, mapIndex=6, mapId=2491, reduceId=421, message=
org.apache.spark.shuffle.FetchFailedException
	at org.apache.spark.errors.SparkCoreErrors$.fetchFailedError(SparkCoreErrors.scala:437)
	at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:1408)
	at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:1103)
	at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:86)
	at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.s

25/11/26 20:47:58 WARN TaskSetManager: Lost task 12.0 in stage 50.0 (TID 2651) (item2vec-etl-w-1.us-central1-f.c.bookbridge-477802.internal executor 29): FetchFailed(BlockManagerId(39, item2vec-etl-sw-nqrx.us-central1-f.c.bookbridge-477802.internal, 7337, None), shuffleId=7, mapIndex=1, mapId=2486, reduceId=994, message=
org.apache.spark.shuffle.FetchFailedException
	at org.apache.spark.errors.SparkCoreErrors$.fetchFailedError(SparkCoreErrors.scala:437)
	at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:1408)
	at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:1103)
	at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:86)
	at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
	at 

Training data saved successfully.


25/11/26 20:49:44 WARN TaskSetManager: Lost task 0.0 in stage 50.0 (TID 2639) (item2vec-etl-sw-6bxj.us-central1-f.c.bookbridge-477802.internal executor 41): FetchFailed(BlockManagerId(39, item2vec-etl-sw-nqrx.us-central1-f.c.bookbridge-477802.internal, 7337, None), shuffleId=7, mapIndex=1, mapId=2486, reduceId=0, message=
org.apache.spark.shuffle.FetchFailedException
	at org.apache.spark.errors.SparkCoreErrors$.fetchFailedError(SparkCoreErrors.scala:437)
	at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:1408)
	at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:1103)
	at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:86)
	at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
	at

In [24]:
print("--- Filtering and Saving Metadata for API ---")

# Select and Clean Columns
# We apply the transformation BEFORE the join to save memory
clean_meta_df = df_meta.select(
    F.col("parent_asin").alias("asin"),  # Rename to match your reviews key
    F.col("title"),
    F.col("main_category"),
    F.col("average_rating"),
    F.col("rating_number"),
    F.col("description"),         # This is an array of strings
    F.col("categories"),          # This is an array of strings
    F.col("author.name").alias("author_name"),  # Flatten nested struct
    F.col("images").getItem(0).alias("primary_image") # Get only the first image object
)

# Filter using the cached Top 100k Books
# Note: We rely on 'asin' matching between reviews and metadata.
filtered_meta = clean_meta_df.join(broadcast(top_books_df), "asin", "inner")


# Handle Missing Titles/Images (Optional Cleaning)
# It's good practice for an API to replace nulls with default values
filtered_meta = filtered_meta.na.fill({
    "title": "Unknown Title",
    "main_category": "Uncategorized",
    "author_name": "Unknown Author"
})

# preview cleaned meta data
filtered_meta.show(5)

--- Filtering and Saving Metadata for API ---


                                                                                

+----------+--------------------+-------------+--------------+-------------+--------------------+--------------------+-----------------+--------------------+
|      asin|               title|main_category|average_rating|rating_number|         description|          categories|      author_name|       primary_image|
+----------+--------------------+-------------+--------------+-------------+--------------------+--------------------+-----------------+--------------------+
|0316185361|Service: A Navy S...|        Books|           4.7|         3421|[Review, Praise f...|[Books, Biographi...|  Marcus Luttrell|{NULL, https://m....|
|1680450263|Make: Electronics...|        Books|           4.7|         1366|[From the Author,...|[Books, Engineeri...|    Charles Platt|{NULL, https://m....|
|1932225323|Four Centuries of...|        Books|           4.8|          133|[About the Author...|[Books, Education...|     David Barton|{NULL, https://m....|
|B003P2WETK|Inspector Imanish...| Buy a Kindle|     

In [25]:
# Save as a SINGLE JSONL file
OUTPUT_META_PATH = f"gs://{BUCKET_NAME}/filtered_metadata"

print(f"Saving cleaned metadata to {OUTPUT_META_PATH}...")

# coalesce(1) puts everything in one file for easy loading in your API
filtered_meta.coalesce(1).write.mode("overwrite").json(OUTPUT_META_PATH)

print("--- Metadata Processing Complete ---")

Saving cleaned metadata to gs://book_bridge/filtered_metadata...


                                                                                

--- Metadata Processing Complete ---
