# Configuration and SparcSession

In [1]:
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, count, sum, avg
from pyspark.sql.types import StructType, StructField, StringType, TimestampType

In [None]:
# Configs for Hadoop in Windows
HADOOP_HOME = os.getenv("HADOOP_HOME")
if (not HADOOP_HOME) and (os.name == "nt"):
    os.environ["HADOOP_HOME"] = r"C:\winutils-master\hadoop-3.3.6"
    HADOOP_HOME = os.getenv("HADOOP_HOME")
    os.environ["PATH"] = fr"{os.environ["PATH"]};{HADOOP_HOME}\bin"
    print("Hadoop Native Libraries configured!")

In [3]:
# Configs for Kafka
KAFKA_BROKER = os.getenv("KAFKA_BROKER", "localhost:9092, broker:29092")
KAFKA_TOPIC = os.getenv("KAFKA_TOPIC", "avroic")
KAFKA_GROUP_ID = os.getenv("KAFKA_GROUP_ID", "spark-aggregator")
DEPLOY_MODE = os.getenv("DEPLOY_MODE", "client")

In [None]:
# Initialize SparkSession
spark = SparkSession.builder \
    .appName("InteractionsAggregator") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0") \
    .config("spark.sql.streaming.statefulOperator.checkCorrectness.enabled", "false") \
    .config("spark.submit.deployMode", DEPLOY_MODE) \
    .getOrCreate()
print("Spark Session created")

# Kafka Stream

In [None]:
# Read from Kafka
df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", KAFKA_BROKER) \
    .option("subscribe", "avroic") \
    .option("startingOffsets", "earliest") \
    .load()

# Kafka messages Schema
schema = StructType([
    StructField('user_id', StringType()),
    StructField('item_id', StringType()),
    StructField('interaction_type', StringType()),
    StructField('timestamp', TimestampType())
])

# Parse Kafka messages
interactions = df \
    .select(from_json(col('value').cast('string'), schema).alias('data')) \
    .select('data.*')

interactions.writeStream \
    .option("checkpointLocation", "./spark-table-checkpoint/interactions") \
    .toTable("interactions")

print("Reading from Kafka...")

In [None]:
spark.sql("select * from interactions order by timestamp desc limit 1").show()

# Aggregations

## agg_interactions_user

In [None]:
sql_query = """
    with agg_interactions_user_interaction_type as (
        select user_id,
               interaction_type,
               count(*) as total_interactions,
               max(timestamp) as last_interaction
        from interactions
        group by user_id, interaction_type
        order by user_id, interaction_type
    )
    select user_id,
           sum(case when interaction_type='click' then total_interactions else 0 end) as total_click,
           sum(case when interaction_type='like' then total_interactions else 0 end) as total_like,
           sum(case when interaction_type='view' then total_interactions else 0 end) as total_view,
           sum(case when interaction_type='purchase' then total_interactions else 0 end) as total_purchase,
           round(avg(total_interactions), 2) as avg_interactions,
           sum(total_interactions) as total_interactions,
           max(last_interaction) as last_interaction
    from agg_interactions_user_interaction_type
    group by user_id
    order by total_interactions desc
"""

result = spark.sql(sql_query)
result.createOrReplaceTempView("agg_interactions_user")
result.show()

# result.write.parquet("output/agg_interactions_user.parquet", mode="overwrite")
# print("Result saved as parquet!")

result.selectExpr("CONCAT('agg_interactions_user-', CAST(user_id AS STRING)) AS key", "to_json(struct(*)) AS value") \
    .write \
    .format("kafka") \
    .option("kafka.bootstrap.servers", KAFKA_BROKER) \
    .option("topic", f"{KAFKA_TOPIC}_aggregated") \
    .save()
print("Result sent to Kafka!")

## avg_interactions

In [None]:
sql_query = """
    select round(avg(total_interactions), 2) as avg_interactions,
           max(last_interaction) as last_interaction
    from agg_interactions_user
"""

result = spark.sql(sql_query)
result.show()

result.selectExpr("'avg_interactions' AS key", "to_json(struct(*)) AS value") \
    .write \
    .format("kafka") \
    .option("kafka.bootstrap.servers", KAFKA_BROKER) \
    .option("topic", f"{KAFKA_TOPIC}_aggregated") \
    .save()
print("Result sent to Kafka!")

## agg_interactions_item

In [None]:
sql_query = """
    select min(total_click) as min_click,
           max(total_click) as max_click,
           min(total_like) as min_like,
           max(total_like) as max_like,
           min(total_view) as min_view,
           max(total_view) as max_view,
           min(total_purchase) as min_purchase,
           max(total_purchase) as max_purchase
    from agg_interactions_user
"""
sql_query = """
    with agg_interactions_item_interaction_type as (
        select item_id,
            interaction_type,
            count(*) as total_interactions
        from interactions
        group by item_id, interaction_type
        order by item_id, interaction_type
    )
    select item_id,
        max(total_interactions) as max_interactions,
        min(total_interactions) as min_interactions
    from agg_interactions_item_interaction_type
    group by item_id
    order by item_id
"""

result = spark.sql(sql_query)
result.show()

result.selectExpr("CONCAT('agg_interactions_item-', CAST(item_id AS STRING)) AS key", "to_json(struct(*)) AS value") \
    .write \
    .format("kafka") \
    .option("kafka.bootstrap.servers", KAFKA_BROKER) \
    .option("topic", f"{KAFKA_TOPIC}_aggregated") \
    .save()
print("Result sent to Kafka!")


# Stop SparkSession

In [35]:
spark.stop()