In [None]:
from kafka import KafkaConsumer
import json
from pymongo import MongoClient

# Kafka configuration
KAFKA_BROKER = 'localhost:9092'
KAFKA_TOPIC = 'clickstream_topic'

# MongoDB configuration
MONGO_URI = 'mongodb://localhost:27017/'
MONGO_DB = 'clickstream_db'
MONGO_COLLECTION = 'clickstream_collection'

# Connect to MongoDB
client = MongoClient(MONGO_URI)
db = client[MONGO_DB]
collection = db[MONGO_COLLECTION]

# Function to ingest data from Kafka and store it in MongoDB
def ingest_to_mongodb():
    consumer = KafkaConsumer(KAFKA_TOPIC, bootstrap_servers=[KAFKA_BROKER], value_deserializer=lambda x: json.loads(x.decode('utf-8')))
    
    for message in consumer:
        data = message.value
        click_data = {
            'click_id': data['click_id'],
            'user_id': data['user_id'],
            'timestamp': data['timestamp'],
            'url': data['url'],
            'country': data['country'],
            'city': data['city'],
            'browser': data['browser'],
            'os': data['os'],
            'device': data['device']
        }
        
        # Insert data into MongoDB
        collection.insert_one(click_data)
        print(f"Data ingested and stored for click_id: {data['click_id']}")

# Call the function to start ingesting data from Kafka and store it in MongoDB
ingest_to_mongodb()


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, countDistinct

# Initialize Spark session
spark = SparkSession.builder \
    .appName('ClickstreamDataProcessor') \
    .getOrCreate()

# MongoDB configuration (same as above)
MONGO_URI = 'mongodb://localhost:27017/'
MONGO_DB = 'clickstream_db'
MONGO_COLLECTION = 'clickstream_collection'

# Function to process clickstream data using Spark
def process_clickstream_data():
    # Read data from MongoDB into a Spark DataFrame
    spark_uri = f"mongodb://127.0.0.1/{MONGO_DB}.{MONGO_COLLECTION}"
    clickstream_data = spark.read \
        .format("com.mongodb.spark.sql.DefaultSource") \
        .option("uri", spark_uri) \
        .load()

    # Perform aggregations by URL and country
    processed_data = clickstream_data.groupBy("url", "country").agg(
        avg(col("timestamp")).alias("average_time_spent"),
        count("timestamp").alias("click_count"),
        countDistinct("user_id").alias("unique_users_count")
    )

    return processed_data

# Call the function to process clickstream data and get the result DataFrame
processed_data = process_clickstream_data()
processed_data.show()

# Stop the Spark session
spark.stop()


In [None]:
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk

# Elasticsearch configuration
ELASTICSEARCH_HOST = 'localhost'
ELASTICSEARCH_PORT = 9200
ELASTICSEARCH_INDEX = 'processed_clickstream_data'

# Function to index the processed data into Elasticsearch
def index_to_elasticsearch(processed_data):
    es = Elasticsearch([{'host': ELASTICSEARCH_HOST, 'port': ELASTICSEARCH_PORT}])

    # Convert the DataFrame to a list of Elasticsearch documents
    docs = processed_data.rdd.map(lambda row: {
        "url": row.url,
        "country": row.country,
        "average_time_spent": row.average_time_spent,
        "click_count": row.click_count,
        "unique_users_count": row.unique_users_count
    }).collect()

    # Bulk index the documents into Elasticsearch
    bulk_data = [
        {
            "_index": ELASTICSEARCH_INDEX,
            "_source": doc
        }
        for doc in docs
    ]

    bulk(es, bulk_data)
    print("Data indexed in Elasticsearch.")

# Call the function to index the processed data
index_to_elasticsearch(processed_data)


This code outlines the entire data pipeline, including data ingestion from Kafka, storing the ingested data in MongoDB, 
processing the data with Apache Spark, aggregating the data, and optionally indexing the processed data into Elasticsearch.

In [None]:
using MongoDB as the data store, indexing the processed data in Elasticsearch might not be necessary