In [None]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [None]:
bronze_layer = "/mnt/medallion/bronze"
silver_layer = "/mnt/medallion/silver"
bronze_layer = "/mnt/medallion/gold"

bronze_layer_table = "/mnt/medallion/bronze/ecommerce-user"
silver_layer_table = "/mnt/medallion/silver/ecommerce-user"
gold_layer_table = "/mnt/medallion/gold/ecommerce-user"

bronze_layer_checkpoint = "/mnt/medallion/bronze/ecommerce-user-checkpoint"
silver_layer_checkpoint = "/mnt/medallion/silver/ecommerce-user-checkpoint"
gold_layer_checkpoint = "/mnt/medallion/gold/ecommerce-user-checkpoint"



In [None]:
connectionString = "InsertConnectionString"
eventHubName = "InsertEventHubName"

ehConf = {
  'eventhubs.connectionString' : sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(connectionString),
  'eventhubs.eventHubName': eventHubName
}
     

### Bronze Layer

In [None]:

# Reading stream: Load data from Azure Event Hub into DataFrame 'df' using the previously configured settings
df = spark.readStream \
    .format("eventhubs") \
    .options(**ehConf) \
    .load() \

# Displaying stream: Show the incoming streaming data for visualization and debugging purposes
df.display()
df.writeStream\
    .option("checkpointLocation", bronze_layer_checkpoint)\
    .outputMode("append")\
    .format("delta")\
    .start(bronze_layer_table)

### Silver layer

In [None]:
ecommerce_user_schema = StructType([
    StructField("browser", StringType()),
    StructField("deviceType", StringType()),
    StructField("ip", StringType()),
    StructField("os", StringType()),
    StructField("path", StringType())
])


In [None]:
from pyspark.sql.functions import to_date

silver_df= spark.readStream\
    .format("delta")\
    .load(bronze_layer_table)\
    .withColumn("body", col("body").cast("string"))\
    .withColumn("body", from_json(col("body"), ecommerce_user_schema))\
    .select("body.Browser",\
        "body.DeviceType",\
        "body.IP",\
        "body.OS",\
        "body.Path",\
        col("enqueuedTime").alias("TimeStamp")
    )\
    .withColumn("TimeStamp", col("TimeStamp") + expr('INTERVAL 7 HOURS')) \
    .withColumn("Date", to_date(col("TimeStamp")))  # Convert TimeStamp to date type

silver_df.writeStream\
    .option("checkpointLocation", silver_layer_checkpoint)\
    .outputMode("append")\
    .format("delta")\
    .start(silver_layer_table)

silver_df.display()


### Gold layer

In [None]:
from pyspark.sql.functions import approx_count_distinct, hour

gold_df = spark.readStream \
    .format("delta") \
    .load(silver_layer_table) \
    .withWatermark("TimeStamp", "1 hour") \
    .groupBy(
        'Date', 'Browser', 'DeviceType', 'Path', 'OS', 
        window("TimeStamp", "1 hour")
    ) \
    .agg(
        approx_count_distinct("IP").alias("unique_visitors")
    ) \
    .withColumn("hour", hour(col("window.start")))\
    .select("Date", "hour","Browser", "DeviceType", "Path", "unique_visitors", "OS")
gold_df.writeStream\
    .option("checkpointLocation", gold_layer_checkpoint)\
    .outputMode("complete")\
    .format("delta")\
    .start(gold_layer_table)
gold_df.display()