Data Engineer - Technical Assessment

In [None]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StructType, StructField, StringType, TimestampType

In [None]:
# Step 1: Data Ingestion from Kafka
spark = SparkSession.builder.appName("ClickstreamDataPipeline")\
                            .getOrCreate()

kafka_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "clickstream_topic") \
    .load()

In [None]:
# Define the schema for parsing the incoming JSON data
schema = StructType([
    StructField("user_id", StringType(), True),
    StructField("timestamp", TimestampType(), True),
    StructField("url", StringType(), True),
    StructField("ip_address", StringType(), True),
    StructField("user_agent", StringType(), True)
])

parsed_df = kafka_df.select(from_json(col("value").cast("string"), schema).alias("data")).select("data.*")

In [None]:
# Write the clickstream DataFrame to the data storage system
clickstream_df.write \
    .format("org.apache.spark.sql.cassandra") \
    .options(table="<table_name>", keyspace="<keyspace_name>") \
    .save()


In [None]:
# Step 3: Periodic Data Processing
processed_df = parsed_df.groupBy("url", "country") \
    .agg(
        countDistinct("user_id").alias("unique_users"),
        count("url").alias("clicks"),
        avg("time_spent").alias("avg_time_spent")
    )


In [None]:
# Step 4: Data Indexing in Elasticsearch
processed_df.writeStream \
    .format("org.elasticsearch.spark.sql") \
    .option("es.nodes", "localhost") \
    .option("es.port", "9200") \
    .option("es.resource", "clickstream_data") \
    .option("checkpointLocation", "/path/to/checkpoint") \
    .start()

spark.streams.awaitAnyTermination()