In [None]:
"""
This section creates a SparkSession, which is the entry point to programming Spark with the DataFrame and Dataset API. 
It configures the SparkSession with necessary properties such as application name, stopping gracefully on shutdown, 
required package, shuffle partitions, and runs Spark in local mode using all available cores. 
"""

# Create the Spark Session
from pyspark.sql import SparkSession

# Create SparkSession with necessary configurations
spark = SparkSession \
    .builder \
    .appName("Streaming from Kafka") \
    .config("spark.streaming.stopGracefullyOnShutdown", True) \
    .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0') \
    .config("spark.sql.shuffle.partitions", 4) \
    .master("local[*]") \
    .getOrCreate()

In [None]:
"""
In this section, necessary PySpark functions and types are imported. 
The schema is defined for the JSON data that will be read from Kafka. 
The schema describes the structure of the data, including field names, data types, and nullability.
"""

from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StructType, StructField, StringType, TimestampType

# Define the schema for the JSON data
schema = StructType([
    StructField("rowKey", StringType(), nullable=False),
    StructField("click_data", StructType([
        StructField("userId", StringType(), nullable=False),
        StructField("timestamp", TimestampType(), nullable=False),
        StructField("url", StringType(), nullable=False)
    ]), nullable=False),
    StructField("geo_data", StructType([
        StructField("country", StringType(), nullable=False),
        StructField("city", StringType(), nullable=False)
    ]), nullable=False),
    StructField("user_agent_data", StructType([
        StructField("browser", StringType(), nullable=False),
        StructField("operatingSystem", StringType(), nullable=False),
        StructField("device", StringType(), nullable=False)
    ]), nullable=False)
])

In [None]:
"""
In this section, the code reads streaming data from Kafka. 
It specifies the Kafka format, bootstrap servers, topic to subscribe to, starting offsets, and loads the data. 
Then, it selects the value column, casts it to a string, and aliases it as "value". 
Next, it applies the schema to the JSON data using the from_json function and aliases it as "data". 
Finally, it selects all columns from the "data" struct.
"""

# Read streaming data from Kafka and apply the schema
df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "test2") \
    .option("startingOffsets", "earliest") \
    .load() \
    .select(col("value").cast("string").alias("value")) \
    .select(from_json(col("value"), schema).alias("data")) \
    .select("data.*")

In [None]:
"""
In this section, the streaming query is defined to write the processed data to the console. 
It specifies the output mode as "append" (to print only new rows), the output format as "console", 
and the trigger interval as "20 seconds" (the time interval to process the data). 
Finally, the query is started, and the program waits until termination using awaitTermination().
"""

# Start the streaming query and process the data
query = df.writeStream \
    .outputMode("append") \
    .format("console") \
    .trigger(processingTime="20 seconds") \
    .start()

query.awaitTermination()

In [None]:
# Note: This Section will helps to move the data to Elastic Search. 
# If we know the elastic search oath we can save it to Elastic Search.

"""
In this section, the aggregatedDf DataFrame (which represents the aggregated data) 
is written to Elasticsearch using the specified configurations.
"""

# Set the Elasticsearch index and mapping
esConf = {
    "es.nodes": "localhost:5601",
    "es.resource": "clickstream",
    "es.mapping.id": "id",
    "es.mapping.date.rich": "false",
    "es.write.operation": "upsert",
    "es.nodes.wan.only": "true"
}

# Write the aggregated DataFrame to Elasticsearch
aggregatedDf.write \
    .format("org.elasticsearch.spark.sql") \
    .options(**esConf) \
    .save()