In [1]:
import os
from pyspark.sql import SparkSession
# ADDED 'udf' to the import list
from pyspark.sql.functions import from_json, col, when, to_timestamp, length, udf
from pyspark.sql.types import StructType, StructField, StringType, LongType, IntegerType, DoubleType
from textblob import TextBlob

# Define ALL required packages: Kafka, Delta Lake, and S3/Hadoop
required_packages = [
    "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0",
    "io.delta:delta-spark_2.12:3.2.0",
    "org.apache.hadoop:hadoop-aws:3.3.4",
    "com.amazonaws:aws-java-sdk-bundle:1.12.262"
]

# Join the packages into a comma-separated string for Spark configuration
spark_packages = ",".join(required_packages)

# Initialize Spark Session with all required packages
spark = SparkSession.builder \
    .appName("AmazonReviewsStreamingETL") \
    .config("spark.jars.packages", spark_packages) \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

# Reduce log verbosity
spark.sparkContext.setLogLevel("WARN")

print("✅ Spark Session initialized successfully with Kafka, Delta Lake, and S3 connectors!")

✅ Spark Session initialized successfully with Kafka, Delta Lake, and S3 connectors!


In [2]:
# Define the schema for the incoming Amazon review data
review_schema = StructType([
    StructField("Id", LongType(), True),
    StructField("ProductId", StringType(), True),
    StructField("UserId", StringType(), True),
    StructField("ProfileName", StringType(), True),
    StructField("HelpfulnessNumerator", IntegerType(), True),
    StructField("HelpfulnessDenominator", IntegerType(), True),
    StructField("Score", IntegerType(), True),
    StructField("Time", LongType(), True),
    StructField("Summary", StringType(), True),
    StructField("Text", StringType(), True)
])

# Read the data stream from the 'ecommerce-events' Kafka topic
kafka_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", os.getenv("KAFKA_BOOTSTRAP_SERVERS", "kafka:29092")) \
    .option("subscribe", "ecommerce-events") \
    .option("startingOffsets", "earliest") \
    .load()

# Parse the JSON value from the Kafka message
parsed_df = kafka_df.select(from_json(col("value").cast("string"), review_schema).alias("data")).select("data.*")

print("✅ Kafka stream connected and data parsing is configured.")

✅ Kafka stream connected and data parsing is configured.


In [3]:
bronze_path = "s3a://bronze/amazon_reviews"
bronze_checkpoint_path = "s3a://bronze/checkpoints/amazon_reviews"

bronze_query = parsed_df.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", bronze_checkpoint_path) \
    .start(bronze_path)

print(f"✅ Streaming to Bronze layer started. Path: {bronze_path}")

✅ Streaming to Bronze layer started. Path: s3a://bronze/amazon_reviews


In [4]:
# Perform cleaning and feature engineering
silver_df = parsed_df \
    .withColumn("review_timestamp", to_timestamp(col("Time"))) \
    .withColumn("helpfulness_ratio",
        when(col("HelpfulnessDenominator") > 0, col("HelpfulnessNumerator") / col("HelpfulnessDenominator"))
        .otherwise(0)
        .cast(DoubleType())
    ) \
    .drop("Time", "ProfileName") # Drop redundant or unnecessary columns

silver_path = "s3a://silver/amazon_reviews"
silver_checkpoint_path = "s3a://silver/checkpoints/amazon_reviews"

silver_query = silver_df.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", silver_checkpoint_path) \
    .start(silver_path)

print(f"✅ Streaming to Silver layer started. Path: {silver_path}")

✅ Streaming to Silver layer started. Path: s3a://silver/amazon_reviews


In [5]:
# Define a User Defined Function (UDF) for sentiment analysis
def get_sentiment_score(text):
    if text:
        return TextBlob(text).sentiment.polarity
    return 0.0

sentiment_udf = udf(get_sentiment_score, DoubleType())

# Add sentiment score to the DataFrame
gold_df = silver_df.withColumn("sentiment_score", sentiment_udf(col("Text")))

gold_path = "s3a://gold/amazon_reviews_sentiment"
gold_checkpoint_path = "s3a://gold/checkpoints/amazon_reviews_sentiment"

gold_query = gold_df.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", gold_checkpoint_path) \
    .start(gold_path)

print(f"✅ Streaming to Gold layer started. Path: {gold_path}")

✅ Streaming to Gold layer started. Path: s3a://gold/amazon_reviews_sentiment


In [6]:
# Check the status of all active streams
for q in spark.streams.active:
    print(f"Query ID: {q.id}, Name: {q.name}, Status: {q.status}")

# To stop a specific query, you can use:
# spark.streams.get(query_id).stop()

# To stop all queries:
# for q in spark.streams.active:
#     q.stop()

Query ID: 2f09b8ae-c055-48ee-b69a-193b0d1fcd59, Name: None, Status: {'message': 'Processing new data', 'isDataAvailable': True, 'isTriggerActive': True}
Query ID: 518865a5-c25d-48ae-93b4-c65128e09d33, Name: None, Status: {'message': 'Processing new data', 'isDataAvailable': True, 'isTriggerActive': True}
Query ID: b102dc84-53b8-492f-a41d-245e9d7f602b, Name: None, Status: {'message': 'Initializing sources', 'isDataAvailable': False, 'isTriggerActive': True}
