# Stream Read Notebook

This notebook performs the following operations:

- Read data from Delta Lake table
- Stream data to Apache Kafka

In [None]:
spark.stop()

In [11]:
# Special Apache Spark configuration with reduced resources for two applications

from pyspark.sql import SparkSession
import os

spark = SparkSession.builder \
    .appName("StreamRead") \
    .master("spark://192.168.0.144:7077") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.hadoop.hive.metastore.uris", "thrift://192.168.0.144:9083") \
    .config("spark.hadoop.javax.jdo.option.ConnectionURL", "jdbc:mysql://192.168.0.144:3306/metastore_db") \
    .config("spark.hadoop.javax.jdo.option.ConnectionDriverName", "com.mysql.cj.jdbc.Driver") \
    .config("spark.hadoop.javax.jdo.option.ConnectionUserName", "lh") \
    .config("spark.hadoop.javax.jdo.option.ConnectionPassword", os.getenv('MYSQL', 'Default_Value')) \
    .config("spark.jars", "/usr/local/spark/jars/delta-storage-3.2.0.jar,/usr/local/spark/jars/delta-spark_2.12-3.2.0.jar,/usr/local/spark/jars/kafka-clients-3.5.1.jar,/usr/local/spark/jars/spark-sql-kafka-0-10_2.12-3.5.1.jar, /usr/local/spark/jars/spark-token-provider-kafka-0-10_2.12-3.5.1.jar, /usr/local/spark/jars/commons-pool2-2.11.1.jar")    \
    .config("spark.delta.logStore.class", "org.apache.spark.sql.delta.storage.HDFSLogStore") \
    .config("spark.executor.memory", "4g") \
    .config("spark.executor.cores", "2") \
    .config("spark.driver.cores", "6") \
    .config("spark.driver.memory", "10g") \
    .config("spark.driver.maxResultSize", "2g") \
    .config("spark.hadoop.fs.defaultFS", "hdfs://192.168.0.144:9000") \
    .config("spark.databricks.delta.clusteredTable.enableClusteringTablePreview", "true") \
    .config("spark.sql.debug.maxToStringFields", "1000") \
    .config("spark.executor.instances", "2") \
    .config("spark.dynamicAllocation.enabled", "true") \
    .config("spark.dynamicAllocation.minExecutors", "1") \
    .config("spark.dynamicAllocation.maxExecutors", "2") \
    .enableHiveSupport() \
    .getOrCreate()



In [13]:
""" Create topic - Kafka on Worker Node 01
!/usr/local/bin/kafka-topics.sh --create \
    --bootstrap-server localhost:9092 \
    --replication-factor 1 \
    --partitions 1 \
    --topic delta
"""

' Create topic\n!/usr/local/bin/kafka-topics.sh --create     --bootstrap-server localhost:9092     --replication-factor 1     --partitions 1     --topic delta\n'

In [None]:
from pyspark.sql.functions import col, to_json, struct, lit

kafka_bootstrap_servers = "192.168.0.145:9092"
kafka_topic = "delta"

df = spark.readStream \
    .format("delta") \
    .load("/datalake/raw_data/raw_data_2")


df_transformed = df.select(
    col("timestamp").cast("string").alias("key"),  
    to_json(struct(
        col("value"),
        col("country"),
        col("event_id"),
        col("actor_id"),
        col("year"),
        col("month"),
        col("day"),
        col("product_id"),
        col("location_id"),
        col("department_id"),
        col("campaign_id"),
        col("customer_id")
    )).alias("value")  
)

query = df_transformed.writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
    .option("topic", kafka_topic) \
    .option("checkpointLocation", "/path/to/checkpoint/kafka") \
    .start()

query.awaitTermination()

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_json, struct, lit
import time

kafka_bootstrap_servers = "192.168.0.145:9092"
kafka_topic = "delta"

df = spark.readStream \
    .format("delta") \
    .load("/datalake/raw_data/raw_data_2")

df_transformed = df.select(
    col("timestamp").cast("string").alias("key"),  
    to_json(struct(
        col("value"),
        col("country"),
        col("event_id"),
        col("actor_id"),
        col("year"),
        col("month"),
        col("day"),
        col("product_id"),
        col("location_id"),
        col("department_id"),
        col("campaign_id"),
        col("customer_id")
    )).alias("value")  
)

batch_count = 0
max_batches = 10
max_records_per_batch = 1000  

def log_metrics(batch_df, batch_id):
    global batch_count
    batch_count += 1

    limited_batch_df = batch_df.limit(max_records_per_batch)

    start_time = time.time()
    num_records = limited_batch_df.count()
    end_time = time.time()
    processing_time = end_time - start_time
    avg_message_size = limited_batch_df.withColumn("size", col("value").cast("string").length()).agg({"size": "avg"}).collect()[0]["avg(size)"]

    print(f"Batch ID: {batch_id}")
    print(f"Number of records: {num_records}")
    print(f"Processing time (seconds): {processing_time}")
    print(f"Throughput (records/second): {num_records / processing_time if processing_time > 0 else float('inf')}")
    print(f"Average message size (bytes): {avg_message_size}")

    limited_batch_df \
        .write \
        .format("kafka") \
        .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
        .option("topic", kafka_topic) \
        .option("checkpointLocation", "/path/to/checkpoint/kafka") \
        .save()

    if batch_count >= max_batches:
        query.stop()

query = df_transformed.writeStream \
    .foreachBatch(log_metrics) \
    .start()

query.awaitTermination()