In [132]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, window, when, lit, approx_count_distinct, count, \
                                    avg, sum as spark_sum, expr
from pyspark.sql.types import StructType, StringType, IntegerType, TimestampType, DoubleType

In [133]:
spark = SparkSession.builder \
    .appName("KafkaStreamingExample") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.8") \
    .getOrCreate()

spark

In [134]:
def create_hdfs_dir(path):
    fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration())
    if not fs.exists(spark._jvm.org.apache.hadoop.fs.Path(path)):
        fs.mkdirs(spark._jvm.org.apache.hadoop.fs.Path(path))
        print(f"Created directory: {path}")
    else:
        print(f"Directory already exists: {path}")


In [135]:
directories = [
    "hdfs://localhost:9000/user/itversity/q-company_raw_layer",
    "hdfs://localhost:9000/user/itversity/q-company_standardized_layer",
    "hdfs://localhost:9000/user/itversity/q-company_conformed_layer",
    "hdfs://localhost:9000/user/itversity/q-company_conformed_layer/denormalized_model",
    "hdfs://localhost:9000/user/itversity/q-company_conformed_layer/normalized_model",
    "hdfs://localhost:9000/user/itversity/q-company_conformed_layer/denormalized_model/all_sales_fact_table",
    "hdfs://localhost:9000/user/itversity/q-company_conformed_layer/denormalized_model/offline_sales_fact_table",
    "hdfs://localhost:9000/user/itversity/q-company_conformed_layer/denormalized_model/online_fact_table",
    "hdfs://localhost:9000/user/itversity/q-company_conformed_layer/normalized_model/branch_dim",
    "hdfs://localhost:9000/user/itversity/q-company_conformed_layer/normalized_model/customer_dim",
    "hdfs://localhost:9000/user/itversity/q-company_conformed_layer/normalized_model/product_dim",
    "hdfs://localhost:9000/user/itversity/q-company_conformed_layer/normalized_model/sales_agent_dim",
    "hdfs://localhost:9000/user/itversity/q-company_conformed_layer/normalized_model/date_dim",
    "hdfs://localhost:9000/user/itversity/q-company_conformed_layer/normalized_model/online_sales_fact",
    "hdfs://localhost:9000/user/itversity/q-company_conformed_layer/normalized_model/offline_sales_fa",
    "hdfs://localhost:9000/user/itversity/q-company_customer_events",
    "hdfs://localhost:9000/user/itversity/checkpoints_custevents/",
    "hdfs://localhost:9000/user/itversity/q-company_customer_events/views",
    "hdfs://localhost:9000/user/itversity/q-company_customer_events/views/hourly_sales",
    "hdfs://localhost:9000/user/itversity/checkpoints_custevents/hourly_sales",
    "hdfs://localhost:9000/user/itversity/q-company_customer_events/views/category_performance",
    "hdfs://localhost:9000/user/itversity/checkpoints_custevents/category_performance",
    "hdfs://localhost:9000/user/itversity/q-company_customer_events/views/payment_method_trends",
    "hdfs://localhost:9000/user/itversity/checkpoints_custevents/payment_method_trends",
    "hdfs://localhost:9000/user/itversity/q-company_customer_events/views/customer_segmentation",
    "hdfs://localhost:9000/user/itversity/checkpoints_custevents/customer_segmentation",
    "hdfs://localhost:9000/user/itversity/q-company_customer_events/views/product_inventory",
    "hdfs://localhost:9000/user/itversity/checkpoints_custevents/product_inventory"
]

for directory in directories:
    create_hdfs_dir(directory)


Created directory: hdfs://localhost:9000/user/itversity/q-company_customer_events
Created directory: hdfs://localhost:9000/user/itversity/checkpoints_custevents/
Created directory: hdfs://localhost:9000/user/itversity/q-company_customer_events/views
Created directory: hdfs://localhost:9000/user/itversity/q-company_customer_events/views/hourly_sales
Created directory: hdfs://localhost:9000/user/itversity/checkpoints_custevents/hourly_sales
Created directory: hdfs://localhost:9000/user/itversity/q-company_customer_events/views/category_performance
Created directory: hdfs://localhost:9000/user/itversity/checkpoints_custevents/category_performance
Created directory: hdfs://localhost:9000/user/itversity/q-company_customer_events/views/payment_method_trends
Created directory: hdfs://localhost:9000/user/itversity/checkpoints_custevents/payment_method_trends
Created directory: hdfs://localhost:9000/user/itversity/q-company_customer_events/views/customer_segmentation
Created directory: hdfs://l

In [136]:
schema = StructType() \
    .add("eventType", StringType()) \
    .add("customerId", StringType()) \
    .add("productId", StringType()) \
    .add("timestamp", TimestampType()) \
    .add("metadata", StructType()
        .add("category", StringType())
        .add("source", StringType())
    ) \
    .add("quantity", IntegerType()) \
    .add("totalAmount", DoubleType()) \
    .add("paymentMethod", StringType()) \
    .add("recommendedProductId", StringType()) \
    .add("algorithm", StringType())



In [137]:
bootstrap_servers = "pkc-56d1g.eastus.azure.confluent.cloud:9092"
kafka_topic = "Emad_topic" # add topic name
kafka_username = "JUKQQM4ZM632RECA"
kafka_password = "UUkrPuSttgOC0U9lY3ZansNsKfN9fbxZPFwrGxudDrfv+knTD4rCwK+KdIzVPX0D"

In [138]:
df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", bootstrap_servers) \
    .option("subscribe", kafka_topic) \
    .option("startingOffsets", "earliest") \
    .option("kafka.security.protocol", "SASL_SSL") \
    .option("kafka.sasl.mechanism", "PLAIN") \
    .option("kafka.sasl.jaas.config",
            f'org.apache.kafka.common.security.plain.PlainLoginModule required username="{kafka_username}" password="{kafka_password}";') \
    .load()



In [139]:
json_df = df.selectExpr("CAST(value AS STRING)").select(from_json("value", schema).alias("data")).select("data.*")

json_df = json_df.withColumn("date", col("timestamp").cast("date"))


In [140]:
query_stream = json_df \
    .writeStream \
    .outputMode("append") \
    .format("parquet") \
    .option("path", "hdfs://localhost:9000//user/itversity/q-company_customer_events") \
    .option("checkpointLocation", "hdfs://localhost:9000/user/itversity/checkpoints_custevents") \
    .option("maxRecordsPerFile", 1000000) \
    .partitionBy("eventType", "date") \
    .trigger(processingTime='1 minutes') \
    .start()


In [141]:
query_stream.awaitTermination()

KeyboardInterrupt: 

In [121]:
#This view aggregates hourly sales by product measures
hourly_sales = json_df \
    .withWatermark("timestamp", "2 hours") \
    .filter(col("eventType") == "purchase") \
    .groupBy(
        window("timestamp", "1 hour"),
        "productId"
    ) \
    .agg(
        count("*").alias("num_purchases"),
        spark_sum("totalAmount").alias("total_sales"),
        avg("totalAmount").alias("avg_sale_value")
    ) \
    .select(
        "productId",
        expr("window.start").alias("window_start"),
        expr("window.end").alias("window_end"),
        "num_purchases",
        "total_sales",
        "avg_sale_value"
    )

In [122]:
#This view aggregates sales data by product category over different time windows
category_performance = json_df \
    .withWatermark("timestamp", "2 hours") \
    .filter(col("eventType") == "purchase") \
    .groupBy(
        col("metadata.category"),
        window("timestamp", "1 hour")
    ) \
    .agg(
        count("*").alias("num_purchases"),
        spark_sum("totalAmount").alias("total_sales"),
        avg("totalAmount").alias("avg_sale_value"),
        approx_count_distinct("customerId").alias("unique_customers")
    ) \
    .select(
        "category",
        expr("window.start").alias("window_start"),
        expr("window.end").alias("window_end"),
        "num_purchases",
        "total_sales",
        "avg_sale_value",
        "unique_customers"
    )

In [123]:
#This view tracks the usage of different payment methods over time
payment_method_trends = json_df \
    .withWatermark("timestamp", "48 hours") \
    .filter(col("eventType") == "purchase") \
    .groupBy(
        "paymentMethod",
        window("timestamp", "1 day")
    ) \
    .agg(
        count("*").alias("num_transactions"),
        spark_sum("totalAmount").alias("total_amount"),
        avg("totalAmount").alias("avg_transaction_value")
    ) \
    .select(
        "paymentMethod",
        expr("window.start").alias("window_start"),
        expr("window.end").alias("window_end"),
        "num_transactions",
        "total_amount",
        "avg_transaction_value"
    )

In [124]:
#This view measures the effectiveness of product recommendations
customer_segmentation = json_df \
    .withWatermark("timestamp", "30 days") \
    .filter(col("eventType") == "purchase") \
    .groupBy(
        "customerId",
        window("timestamp", "30 days")
    ) \
    .agg(
        count("*").alias("purchase_frequency"),
        spark_sum("totalAmount").alias("total_spend")
    ) \
    .withColumn("segment", 
        when((col("purchase_frequency") > 10) & (col("total_spend") > 1000), "High Value")
        .when((col("purchase_frequency") > 5) & (col("total_spend") > 500), "Medium Value")
        .otherwise("Low Value")
    ) \
    .select(
        "customerId",
        expr("window.start").alias("window_start"),
        expr("window.end").alias("window_end"),
        "purchase_frequency",
        "total_spend",
        "segment"
    )

In [125]:
#This view keeps track of product inventory levels based on purchases
product_inventory = json_df \
    .withWatermark("timestamp", "2 hours") \
    .filter(col("eventType") == "purchase") \
    .groupBy(
        "productId",
        window("timestamp", "1 hour")
    ) \
    .agg(spark_sum("quantity").alias("quantity_sold")) \
    .select(
        "productId",
        expr("window.start").alias("window_start"),
        expr("window.end").alias("window_end"),
        "quantity_sold"
    )

In [126]:
def write_stream_view_to_hdfs(df, path, checkpoint_path):
    return df.writeStream \
        .outputMode("append") \
        .format("parquet") \
        .option("path", path) \
        .option("checkpointLocation", checkpoint_path) \
        .trigger(processingTime='5 minutes') \
        .option("maxRecordsPerFile", 100000) \
        .option("maxFilesPerTrigger", 10) \
        .option("compression", "snappy") \
        .start()


In [127]:
views = [
    (hourly_sales, "hourly_sales"),
    (category_performance, "category_performance"),
    (payment_method_trends, "payment_method_trends"),
    (customer_segmentation, "customer_segmentation"),
    (product_inventory, "product_inventory")
]

queries_views = [query_stream]
for view, name in views:
    query = write_stream_view_to_hdfs(
        view,
        f"hdfs://localhost:9000/user/itversity/q-company_customer_events/{name}",
        f"hdfs://localhost:9000/user/itversity/checkpoints_custevents/{name}"
    )
    queries_views.append(query)

In [128]:
for query in queries_views:
    query.awaitTermination()

KeyboardInterrupt: 

In [142]:
spark.stop()
