In [None]:
# Install necessary packages
# !sudo apt-get update -qq
# !sudo apt-get install openjdk-8-jdk-headless -qq > /dev/null

# # Download and extract PySpark 3.5.2
# !wget -q https://archive.apache.org/dist/spark/spark-3.5.2/spark-3.5.2-bin-hadoop3.tgz
# !tar xf spark-3.5.2-bin-hadoop3.tgz

# # Install findspark
# !pip install -q findspark
# !pip install pyspark

import os

# Set environment variables for Java and Spark
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "./spark-3.5.2-bin-hadoop3"


In [None]:

import logging
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col , regexp_replace
from pyspark.sql.types import StructType, StructField, StringType

In [None]:
def create_spark_connection():
    s_conn = None

    try:
        s_conn = SparkSession.builder \
            .appName('SparkDataStreaming') \
            .config("spark.network.timeout", "800s") \
            .config("spark.jars.packages", "com.github.jnr:jnr-posix:3.1.15,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.2") \
            .getOrCreate()

        s_conn.sparkContext.setLogLevel("ERROR")
        logging.info("Spark connection created successfully!")
    except Exception as e:
        logging.error(f"Couldn't create the spark session due to exception {e}")

    return s_conn
    

In [None]:
def connect_to_kafka(spark_conn):
    spark_df = None
    try:
        spark_df = spark_conn.readStream \
            .format('kafka') \
            .option('kafka.bootstrap.servers', '44.215.213.113:9092') \
            .option('subscribe', 'e-commerce') \
            .option('startingOffsets', 'earliest') \
            .load()
        logging.info("kafka dataframe created successfully")
    except Exception as e:
        logging.warning(f"kafka dataframe could not be created because: {e}")

    return spark_df

In [None]:
def create_selection_df_from_kafka(spark_df):
    # Define schema for parsing the JSON data
    schema = StructType([
        StructField("product_id", StringType(), True),
        StructField("product_name", StringType(), True),
        StructField("category", StringType(), True),
        StructField("discounted_price", StringType(), True),
        StructField("actual_price", StringType(), True),
        StructField("discount_percentage", StringType(), True),
        StructField("rating", StringType(), True),
        StructField("rating_count", StringType(), True),
        StructField("about_product", StringType(), True),
        StructField("user_id", StringType(), True),
        StructField("user_name", StringType(), True),
        StructField("review_id", StringType(), True),
        StructField("review_title", StringType(), True),
        StructField("review_content", StringType(), True),
        StructField("img_link", StringType(), True),
        StructField("product_link", StringType(), True)
    ])

    # Parse the JSON data
    parsed_df = spark_df.selectExpr("CAST(value AS STRING)") \
        .select(from_json(col("value"), schema).alias("data")) \
        .select("data.*")

    # Create a dictionary mapping the original column names to the new cleaned names
    rename_dict = {col_name: col_name.replace(" ", "_").replace("-", "_") for col_name in parsed_df.columns}
    
    # Apply the renaming
    for old_name, new_name in rename_dict.items():
        parsed_df = parsed_df.withColumnRenamed(old_name, new_name)

    # Convert to appropriate data types
    processed_df = parsed_df.withColumn("discounted_price", regexp_replace("discounted_price", "[^0-9]", "").cast("int")) \
        .withColumn("actual_price", regexp_replace("actual_price", "[^0-9]", "").cast("int")) \
        .withColumn("discount_percentage", regexp_replace("discount_percentage", "%", "").cast("int")) \
        .withColumn("rating", col("rating").cast("float")) \
        .withColumn("rating_count", col("rating_count").cast("int"))

    return processed_df

In [None]:
def write_stream_to_console(selection_df):
    try:
        query = selection_df.writeStream \
            .outputMode("append") \
            .format("console") \
            .start()

        query.awaitTermination()
        logging.info("Stream started successfully and writing to console")
    except Exception as e:
        logging.error(f"Stream could not be started due to exception: {e}")

In [None]:
if __name__ == "__main__":
    spark_conn = create_spark_connection()
    if spark_conn is not None:
        spark_df = connect_to_kafka(spark_conn)
        if spark_df is not None:
            selection_df = create_selection_df_from_kafka(spark_df)
            write_stream_to_console(selection_df)


                                                                                

-------------------------------------------
Batch: 2
-------------------------------------------
+----------+--------------------+--------------------+----------------+------------+-------------------+------+------------+---------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|product_id|        product_name|            category|discounted_price|actual_price|discount_percentage|rating|rating_count|        about_product|             user_id|           user_name|           review_id|        review_title|      review_content|            img_link|        product_link|
+----------+--------------------+--------------------+----------------+------------+-------------------+------+------------+---------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|B07JW9H4J1|Wayona Ny

[Stage 3:>                                                          (0 + 1) / 1]

-------------------------------------------
Batch: 3
-------------------------------------------
+----------+--------------------+--------------------+----------------+------------+-------------------+------+------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|product_id|        product_name|            category|discounted_price|actual_price|discount_percentage|rating|rating_count|       about_product|             user_id|           user_name|           review_id|        review_title|      review_content|            img_link|        product_link|
+----------+--------------------+--------------------+----------------+------------+-------------------+------+------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|B00NH11PEY|AmazonBasics

[Stage 4:>                                                          (0 + 1) / 1]

-------------------------------------------
Batch: 4
-------------------------------------------
+----------+--------------------+--------------------+----------------+------------+-------------------+------+------------+---------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|product_id|        product_name|            category|discounted_price|actual_price|discount_percentage|rating|rating_count|        about_product|             user_id|           user_name|           review_id|        review_title|      review_content|            img_link|        product_link|
+----------+--------------------+--------------------+----------------+------------+-------------------+------+------------+---------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|B07JW9H4J1|Wayona Ny

                                                                                

-------------------------------------------
Batch: 5
-------------------------------------------
+----------+--------------------+--------------------+----------------+------------+-------------------+------+------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|product_id|        product_name|            category|discounted_price|actual_price|discount_percentage|rating|rating_count|       about_product|             user_id|           user_name|           review_id|        review_title|      review_content|            img_link|        product_link|
+----------+--------------------+--------------------+----------------+------------+-------------------+------+------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|B07JW1Y6XV|Wayona Nylon

                                                                                

-------------------------------------------
Batch: 6
-------------------------------------------
+----------+--------------------+--------------------+----------------+------------+-------------------+------+------------+---------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|product_id|        product_name|            category|discounted_price|actual_price|discount_percentage|rating|rating_count|        about_product|             user_id|           user_name|           review_id|        review_title|      review_content|            img_link|        product_link|
+----------+--------------------+--------------------+----------------+------------+-------------------+------+------------+---------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|B07JW9H4J1|Wayona Ny

                                                                                

-------------------------------------------
Batch: 7
-------------------------------------------
+----------+--------------------+--------------------+----------------+------------+-------------------+------+------------+---------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|product_id|        product_name|            category|discounted_price|actual_price|discount_percentage|rating|rating_count|        about_product|             user_id|           user_name|           review_id|        review_title|      review_content|            img_link|        product_link|
+----------+--------------------+--------------------+----------------+------------+-------------------+------+------------+---------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|B082T6V3DT|AmazonBas

[Stage 8:>                                                          (0 + 1) / 1]

-------------------------------------------
Batch: 8
-------------------------------------------
+----------+--------------------+--------------------+----------------+------------+-------------------+------+------------+---------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|product_id|        product_name|            category|discounted_price|actual_price|discount_percentage|rating|rating_count|        about_product|             user_id|           user_name|           review_id|        review_title|      review_content|            img_link|        product_link|
+----------+--------------------+--------------------+----------------+------------+-------------------+------+------------+---------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|B07JW9H4J1|Wayona Ny

                                                                                

-------------------------------------------
Batch: 9
-------------------------------------------
+----------+--------------------+--------------------+----------------+------------+-------------------+------+------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|product_id|        product_name|            category|discounted_price|actual_price|discount_percentage|rating|rating_count|       about_product|             user_id|           user_name|           review_id|        review_title|      review_content|            img_link|        product_link|
+----------+--------------------+--------------------+----------------+------------+-------------------+------+------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|B08DPLCM6T|LG 80 cm (32

                                                                                

-------------------------------------------
Batch: 10
-------------------------------------------
+----------+--------------------+--------------------+----------------+------------+-------------------+------+------------+---------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|product_id|        product_name|            category|discounted_price|actual_price|discount_percentage|rating|rating_count|        about_product|             user_id|           user_name|           review_id|        review_title|      review_content|            img_link|        product_link|
+----------+--------------------+--------------------+----------------+------------+-------------------+------+------------+---------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|B08FYB5HHK|TP-Link 

                                                                                

-------------------------------------------
Batch: 11
-------------------------------------------
+----------+--------------------+--------------------+----------------+------------+-------------------+------+------------+---------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|product_id|        product_name|            category|discounted_price|actual_price|discount_percentage|rating|rating_count|        about_product|             user_id|           user_name|           review_id|        review_title|      review_content|            img_link|        product_link|
+----------+--------------------+--------------------+----------------+------------+-------------------+------+------------+---------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|B07JW9H4J1|Wayona N

                                                                                

-------------------------------------------
Batch: 12
-------------------------------------------
+----------+--------------------+--------------------+----------------+------------+-------------------+------+------------+---------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|product_id|        product_name|            category|discounted_price|actual_price|discount_percentage|rating|rating_count|        about_product|             user_id|           user_name|           review_id|        review_title|      review_content|            img_link|        product_link|
+----------+--------------------+--------------------+----------------+------------+-------------------+------+------------+---------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|B08PV1X771|Samsung 