[Reference](https://medium.com/@simardeep.oberoi/building-a-robust-real-time-e-commerce-analytics-pipeline-a-data-engineering-project-b57db9e9bfc4)

In [1]:
import json
import random
import time
from datetime import datetime
from faker import Faker
from kafka import KafkaProducer
import os
from concurrent.futures import ThreadPoolExecutor


fake = Faker()
# Kafka configuration
kafka_broker = os.getenv('KAFKA_BROKER', 'broker:9092')
producer = KafkaProducer(bootstrap_servers=[kafka_broker],
                         value_serializer=lambda x: json.dumps(x).encode('utf-8'))
customers = []
products = []
# Generate Customer Data
def generate_customer():
    customer = {
        "customer_id": fake.uuid4(),
        "name": fake.name(),
        "email": fake.email(),
        "location": fake.address(),
        "age": random.randint(18, 70),
        "gender": random.choice(["Male", "Female", "Other"]),
        "account_created": fake.past_date().isoformat(),
        "last_login": fake.date_time_this_month().isoformat()
    }
    customers.append(customer["customer_id"])
    return customer
# Generate Product Data
def generate_product():
    categories = ['Electronics', 'Books', 'Clothing', 'Home & Garden']
    product = {
        "product_id": fake.uuid4(),
        "name": fake.word().title(),
        "category": random.choice(categories),
        "price": round(random.uniform(10, 500), 2),
        "stock_quantity": random.randint(0, 100),
        "supplier": fake.company(),
        "rating": round(random.uniform(1, 5), 1)
    }
    products.append(product["product_id"])
    return product
# Generate Transaction Data
def generate_transaction():
    customer_id = random.choice(customers)
    product_id = random.choice(products)
    return {
        "transaction_id": fake.uuid4(),
        "customer_id": customer_id,
        "product_id": product_id,
        "quantity": random.randint(1, 5),
        "date_time": fake.date_time_this_year().isoformat(),
        "status": random.choice(["completed", "pending", "canceled"]),
        "payment_method": random.choice(["credit card", "PayPal", "bank transfer"])
    }
# Generate Product View Data
def generate_product_view():
    return {
        "view_id": fake.uuid4(),
        "customer_id": random.choice(customers),
        "product_id": random.choice(products),
        "timestamp": fake.date_time_this_year().isoformat(),
        "view_duration": random.randint(10, 300)  # Duration in seconds
    }
# Generate System Log Data
def generate_system_log():
    log_levels = ["INFO", "WARNING", "ERROR"]
    return {
        "log_id": fake.uuid4(),
        "timestamp": fake.date_time_this_year().isoformat(),
        "level": random.choice(log_levels),
        "message": fake.sentence()
    }
# Generate User Interaction Data
def generate_user_interaction():
    interaction_types = ["wishlist_addition", "review", "rating"]
    return {
        "interaction_id": fake.uuid4(),
        "customer_id": random.choice(customers),
        "product_id": random.choice(products),
        "timestamp": fake.date_time_this_year().isoformat(),
        "interaction_type": random.choice(interaction_types),
        "details": fake.sentence() if interaction_types == "review" else None
    }
# Function to send data to Kafka
def send_data():
    # Occasionally add new customers or products
    if random.random() < 0.5:
        customer = generate_customer()
        producer.send('ecommerce_customers', value=customer)
    else:
        product = generate_product()
        producer.send('ecommerce_products', value=product)
    # Higher chance to create transactions and interactions
    if customers and products:
        transaction = generate_transaction()
        producer.send('ecommerce_transactions', value=transaction)
        product_view = generate_product_view()
        if product_view:
            producer.send('ecommerce_product_views', value=product_view)
        user_interaction = generate_user_interaction()
        if user_interaction:
            producer.send('ecommerce_user_interactions', value=user_interaction)
    producer.send('ecommerce_system_logs', value=generate_system_log())
# Parallel Data Generation
with ThreadPoolExecutor(max_workers=5) as executor:
    while True:
        executor.submit(send_data)
        time.sleep(random.uniform(0.01, 0.1))

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType
from pyspark.sql import DataFrame
import logging

# Initialize logging
logging.basicConfig(level=logging.ERROR)
logger = logging.getLogger(__name__)


# Initialize Spark Session
spark = SparkSession.builder \\
    .appName("Ecommerce Data Analysis") \\
    .config("spark.es.nodes", "elasticsearch") \\
    .config("spark.es.port", "9200") \\
    .config("spark.es.nodes.wan.only", "true") \\
    .getOrCreate()


spark.sparkContext.setLogLevel("ERROR")

# Kafka configuration
kafka_bootstrap_servers = "broker:29092,broker2:29094"


customerSchema = StructType([
    StructField("customer_id", StringType(), True),
    StructField("name", StringType(), True),
    StructField("email", StringType(), True),
    StructField("location", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("gender", StringType(), True),
    StructField("account_created", StringType(), True),
    StructField("last_login", TimestampType(), True)
])
customerDF = (spark.readStream
              .format("kafka")
              .option("kafka.bootstrap.servers", kafka_bootstrap_servers)
              .option("subscribe", "ecommerce_customers")
              .option("startingOffsets", "earliest")  # Start from the earliest records
              .load()
              .selectExpr("CAST(value AS STRING)")
              .select(from_json("value", customerSchema).alias("data"))
              .select("data.*")
              .withWatermark("last_login", "2 hours")
             )


# Read data from 'ecommerce_products' topic
productSchema = StructType([
    StructField("product_id", StringType(), True),
    StructField("name", StringType(), True),
    StructField("category", StringType(), True),
    StructField("price", DoubleType(), True),
    StructField("stock_quantity", IntegerType(), True),
    StructField("supplier", StringType(), True),
    StructField("rating", DoubleType(), True)
])
productDF = spark.readStream \\
    .format("kafka") \\
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \\
    .option("subscribe", "ecommerce_products") \\
    .option("startingOffsets", "earliest") \\
    .load() \\
    .selectExpr("CAST(value AS STRING)") \\
    .select(from_json("value", productSchema).alias("data")) \\
    .select("data.*") \\
    .withColumn("processingTime", current_timestamp())  # Add processing timestamp
productDF = productDF.withWatermark("processingTime", "2 hours")


# Read data from 'ecommerce_transactions' topic
transactionSchema = StructType([
    StructField("transaction_id", StringType(), True),
    StructField("customer_id", StringType(), True),
    StructField("product_id", StringType(), True),
    StructField("quantity", IntegerType(), True),
    StructField("date_time", TimestampType(), True),
    StructField("status", StringType(), True),
    StructField("payment_method", StringType(), True)
])
transactionDF = spark.readStream \\
    .format("kafka") \\
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \\
    .option("subscribe", "ecommerce_transactions") \\
    .option("startingOffsets", "earliest") \\
    .load() \\
    .selectExpr("CAST(value AS STRING)") \\
    .select(from_json("value", transactionSchema).alias("data")) \\
    .select("data.*")
transactionDF = transactionDF.withColumn("processingTime", current_timestamp())
transactionDF = transactionDF.withWatermark("processingTime", "2 hours")


# Read data from 'ecommerce_product_views' topic
productViewSchema = StructType([
    StructField("view_id", StringType(), True),
    StructField("customer_id", StringType(), True),
    StructField("product_id", StringType(), True),
    StructField("timestamp", TimestampType(), True),
    StructField("view_duration", IntegerType(), True)
])
productViewDF = (spark.readStream
                 .format("kafka")
                 .option("kafka.bootstrap.servers", kafka_bootstrap_servers)
                 .option("subscribe", "ecommerce_product_views")
                 .option("startingOffsets", "earliest")
                 .load()
                 .selectExpr("CAST(value AS STRING)")
                 .select(from_json("value", productViewSchema).alias("data"))
                 .select("data.*")
                 .withColumn("timestamp", col("timestamp").cast("timestamp"))
                 .withWatermark("timestamp", "1 hour")
                 )
productViewDF = productViewDF.withColumn("processingTime", current_timestamp())
productViewDF = productViewDF.withWatermark("processingTime", "2 hours")


# Read data from 'ecommerce_system_logs' topic
systemLogSchema = StructType([
    StructField("log_id", StringType(), True),
    StructField("timestamp", TimestampType(), True),
    StructField("level", StringType(), True),
    StructField("message", StringType(), True)
])
systemLogDF = spark.readStream \\
    .format("kafka") \\
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \\
    .option("subscribe", "ecommerce_system_logs") \\
    .option("startingOffsets", "earliest") \\
    .load() \\
    .selectExpr("CAST(value AS STRING)") \\
    .select(from_json("value", systemLogSchema).alias("data")) \\
    .select("data.*")
systemLogDF = systemLogDF.withColumn("processingTime", current_timestamp())
systemLogDF = systemLogDF.withWatermark("processingTime", "2 hours")


# Read data from 'ecommerce_user_interactions' topic
userInteractionSchema = StructType([
    StructField("interaction_id", StringType(), True),
    StructField("customer_id", StringType(), True),
    StructField("product_id", StringType(), True),
    StructField("timestamp", TimestampType(), True),
    StructField("interaction_type", StringType(), True),
    StructField("details", StringType(), True)
])
userInteractionDF = spark.readStream \\
    .format("kafka") \\
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \\
    .option("subscribe", "ecommerce_user_interactions") \\
    .option("startingOffsets", "earliest") \\
    .load() \\
    .selectExpr("CAST(value AS STRING)") \\
    .select(from_json("value", userInteractionSchema).alias("data")) \\
    .select("data.*")
userInteractionDF = userInteractionDF.withColumn("processingTime", current_timestamp())
userInteractionDF = userInteractionDF.withWatermark("processingTime", "2 hours")


#This analysis  focus on demographics and account activity.
customerAnalysisDF = (customerDF
                      .groupBy(
                          window(col("last_login"), "1 day"),  # Windowing based on last_login
                          "gender"
                      )
                      .agg(
                          count("customer_id").alias("total_customers"),
                          max("last_login").alias("last_activity")
                      )
                     )
# Analyzing product popularity and stock status with windowing
productAnalysisDF = productDF \\
    .groupBy(
        window(col("processingTime"), "1 hour"),  # Window based on processingTime
        "category"
    ) \\
    .agg(
        avg("price").alias("average_price"),
        sum("stock_quantity").alias("total_stock")
    ) \\
    .select(
        col("window.start").alias("window_start"),
        col("window.end").alias("window_end"),
        col("category"),
        col("average_price"),
        col("total_stock")
    )


#Analyzing sales data
salesAnalysisDF = transactionDF \\
    .groupBy(
        window(col("processingTime"), "1 hour"),  # Window based on processingTime
        "product_id"
    ) \\
    .agg(
        count("transaction_id").alias("number_of_sales"),
        sum("quantity").alias("total_quantity_sold"),
        approx_count_distinct("customer_id").alias("unique_customers")  # Use approx_count_distinct
    ) \\
    .select(
        col("window.start").alias("window_start"),
        col("window.end").alias("window_end"),
        col("product_id"),
        col("number_of_sales"),
        col("total_quantity_sold"),
        col("unique_customers")
    )


# Understanding customer interest in products.
productViewsAnalysisDF = productViewDF \\
    .withWatermark("timestamp", "2 hours") \\
    .groupBy(
        window(col("timestamp"), "1 hour"),
        "product_id"
    ) \\
    .agg(
        count("view_id").alias("total_views"),
        avg("view_duration").alias("average_view_duration")
    ) \\
    .select(
        col("window.start").alias("window_start"),
        col("window.end").alias("window_end"),
        col("product_id"),
        col("total_views"),
        col("average_view_duration")
    )


# User Interaction Analysis
interactionAnalysisDF = userInteractionDF \\
    .withWatermark("timestamp", "2 hours") \\
    .groupBy(
        window(col("timestamp"), "1 hour"),
        "interaction_type"
    ) \\
    .agg(
        count("interaction_id").alias("total_interactions"),
        approx_count_distinct("customer_id").alias("unique_users_interacted")  # Use approx_count_distinct
    ) \\
    .select(
        col("window.start").alias("window_start"),
        col("window.end").alias("window_end"),
        col("interaction_type"),
        col("total_interactions"),
        col("unique_users_interacted")
    )


def writeToElasticsearch(df, index_name):
    def write_and_log(batch_df: DataFrame, batch_id: int):
        logger.info(f"Attempting to write batch {batch_id} to Elasticsearch index {index_name}.")
        try:
            if not batch_df.isEmpty():
                logger.info(f"Batch {batch_id} has data. Writing to Elasticsearch.")
                batch_df.write \\
                    .format("org.elasticsearch.spark.sql") \\
                    .option("checkpointLocation", f"/opt/bitnami/spark/checkpoint/{index_name}/{batch_id}") \\
                    .option("es.resource", f"{index_name}/doc") \\
                    .option("es.nodes", "elasticsearch") \\
                    .option("es.port", "9200") \\
                    .option("es.nodes.wan.only", "true") \\
                    .save()
                logger.info(f"Batch {batch_id} written successfully.")
            else:
                logger.info(f"Batch {batch_id} is empty. Skipping write.")
        except Exception as e:
            logger.error(f"Error writing batch {batch_id} to Elasticsearch: {e}")
    return df.writeStream \\
             .outputMode("append") \\
             .foreachBatch(write_and_log) \\
             .start()
writeToElasticsearch(customerAnalysisDF, "customer_analysis")
writeToElasticsearch(productAnalysisDF, "product_analysis")
writeToElasticsearch(salesAnalysisDF, "sales_analysis")
writeToElasticsearch(productViewsAnalysisDF, "product_views_analysis")
writeToElasticsearch(interactionAnalysisDF, "interaction_analysis")
spark.streams.awaitAnyTermination()# Initialize logging
logging.basicConfig(level=logging.ERROR)
logger = logging.getLogger(__name__)

```
version: "3.8"
services:
  ####################
  # Python Data generator
  ####################
  data-generator:
    build: ./python-data-generator
    container_name: data-generator
    depends_on:
      - broker
      - zookeeper
      - init-kafka
      # - schema-registry
      # - connect
    networks:
      - backend
    environment:
      - KAFKA_BROKER=broker:${BROKER_INTERNAL_PORT}
    restart: on-failure
  ####################
  # Elasticsearch
  ####################
  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:7.13.2
    container_name: elasticsearch
    environment:
      - "discovery.type=single-node"
      - ES_JAVA_OPTS=-Xms4g -Xmx4g
    ports:
      - "9200:9200"
    networks:
      - backend
    volumes:
      - $PWD/esdata1:/usr/share/elasticsearch/data
    healthcheck:
      test: ["CMD-SHELL", "curl --silent --fail localhost:9200/_cluster/health || exit 1"]
      interval: 30s
      timeout: 10s
      retries: 3
    restart: unless-stopped
  ####################
  # Kibana
  ####################
  kibana:
    image: docker.elastic.co/kibana/kibana:7.13.2
    container_name: kibana
    ports:
      - "5601:5601"
    environment:
      - ELASTICSEARCH_URL=http://elasticsearch:9200
      - ELASTICSEARCH_HOSTS=http://elasticsearch:9200
    depends_on:
      - elasticsearch
    networks:
      - backend
  ####################
  # Apache Spark Master Node
  ####################
  spark_master:
    image: bitnami/spark:3
    command: /opt/bitnami/spark/bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.1,org.elasticsearch:elasticsearch-spark-30_2.12:7.13.1 /opt/bitnami/spark/app/spark-processing.py
    container_name: spark_master
    ports:
      - "8077:8080"
    environment:
      - SPARK_MODE=master
      - SPARK_RPC_AUTHENTICATION_ENABLED=no
      - SPARK_RPC_ENCRYPTION_ENABLED=no
      - SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
      - SPARK_SSL_ENABLED=no
    volumes:
      - $PWD/spark-processing:/opt/bitnami/spark/app
      - $PWD/spark-checkpoint:/opt/bitnami/spark/checkpoint
    networks:
      - backend
    depends_on:
      - zookeeper
      - broker
      - elasticsearch
      - data-generator
  ####################
  # zookeeper
  ####################
  zookeeper:
    image: confluentinc/cp-zookeeper:${KAFKA_VERSION}
    hostname: zookeeper
    container_name: zookeeper
    networks:
      - backend
    ports:
      - ${ZOOKEEPER_CLIENT_PORT}:2181
    environment:
      ZOOKEEPER_CLIENT_PORT: ${ZOOKEEPER_CLIENT_PORT}
      ZOOKEEPER_TICK_TIME: ${ZOOKEEPER_TICK_TIME}
      ZOOKEEPER_LOG4J_ROOT_LOGLEVEL: WARN
      ZOOKEEPER_TOOLS_LOG4J_LOGLEVEL: ERROR
    volumes:
      - $PWD/kafka-ce/zk/data:/var/lib/zookeeper/data
      - $PWD/kafka-ce/zk/txn-logs:/var/lib/zookeeper/log
    restart: always
  ####################
  # broker
  ####################
  broker:
    image: confluentinc/cp-kafka:${KAFKA_VERSION}
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    networks:
      - backend
    ports:
      - ${BROKER_EXTERNAL_PORT}:${BROKER_EXTERNAL_PORT}
      - ${BROKER_LOCAL_PORT}:${BROKER_LOCAL_PORT}
      - ${BROKER_JMX_PORT}:9101
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:${ZOOKEEPER_CLIENT_PORT}
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:${BROKER_INTERNAL_PORT},PLAINTEXT_HOST://localhost:${BROKER_LOCAL_PORT}
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: ${REPLICATION_FACTOR}
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 1000
      KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: ${REPLICATION_FACTOR}
      KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: ${REPLICATION_FACTOR}
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: ${REPLICATION_FACTOR}
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: ${REPLICATION_FACTOR}
      KAFKA_DELETE_TOPIC_ENABLE: true
      KAFKA_JMX_PORT: 9101
      KAFKA_JMX_HOSTNAME: broker
      KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: <http://schema-registry>:${SCHEMA_REGISTRY_PORT}
      KAFKA_LOG4J_ROOT_LOGLEVEL: WARN
      KAFKA_TOOLS_LOG4J_LOGLEVEL: ERROR
      KAFKA_LOG_RETENTION_MS: -1
      KAFKA_LOG4J_LOGGERS: org.apache.zookeeper=WARN,org.apache.kafka=WARN,kafka=WARN,kafka.cluster=WARN,kafka.controller=WARN,kafka.coordinator=WARN,kafka.log=WARN,kafka.server=WARN,kafka.zookeeper=WARN,state.change.logger=WARN
    volumes:
      - $PWD/kafka-ce/broker/data:/var/lib/kafka/data
    restart: always
  ####################
  # broker2
  ####################
  broker2:
    image: confluentinc/cp-kafka:${KAFKA_VERSION}
    hostname: broker2
    container_name: broker2
    depends_on:
      - zookeeper
      - broker
    networks:
      - backend
    ports:
      - ${BROKER2_EXTERNAL_PORT}:${BROKER2_EXTERNAL_PORT}
      - ${BROKER2_LOCAL_PORT}:${BROKER2_LOCAL_PORT}
      - ${BROKER2_JMX_PORT}:9101
    environment:
      KAFKA_BROKER_ID: 2
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:${ZOOKEEPER_CLIENT_PORT}
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker2:${BROKER2_INTERNAL_PORT},PLAINTEXT_HOST://localhost:${BROKER2_LOCAL_PORT}
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: ${REPLICATION_FACTOR}
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 1000
      KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: ${REPLICATION_FACTOR}
      KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: ${REPLICATION_FACTOR}
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: ${REPLICATION_FACTOR}
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: ${REPLICATION_FACTOR}
      KAFKA_JMX_PORT: 9101
      KAFKA_JMX_HOSTNAME: broker2
      KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: <http://schema-registry>:${SCHEMA_REGISTRY_PORT}
      KAFKA_LOG4J_ROOT_LOGLEVEL: WARN
      KAFKA_TOOLS_LOG4J_LOGLEVEL: ERROR
      KAFKA_LOG_RETENTION_MS: -1
      KAFKA_LOG4J_LOGGERS: org.apache.zookeeper=WARN,org.apache.kafka=WARN,kafka=WARN,kafka.cluster=WARN,kafka.controller=WARN,kafka.coordinator=WARN,kafka.log=WARN,kafka.server=WARN,kafka.zookeeper=WARN,state.change.logger=WARN
    volumes:
      - $PWD/kafka-ce/broker2/data:/var/lib/kafka/data
    restart: always
  ####################
  # schema-registry
  ####################
  schema-registry:
    image: confluentinc/cp-schema-registry:${KAFKA_VERSION}
    hostname: schema-registry
    container_name: schema-registry
    depends_on:
      - zookeeper
      - broker
    networks:
      - backend
    ports:
      - ${SCHEMA_REGISTRY_PORT}:8081
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:${BROKER_INTERNAL_PORT}'
      SCHEMA_REGISTRY_LISTENERS: <http://$>{SCHEMA_REGISTRY_PUBLIC_HOST}:${SCHEMA_REGISTRY_PORT}
      SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: WARN
      SCHEMA_REGISTRY_TOOLS_LOG4J_LOGLEVEL: ERROR
    volumes:
      - $PWD/kafka-ce/schema-registry/data:/data
    restart: always
  ####################
  # connect
  ####################
  connect:
    image: confluentinc/cp-kafka-connect:${KAFKA_VERSION}
    hostname: connect
    container_name: connect
    depends_on:
      - zookeeper
      - broker
      - schema-registry
    networks:
      - backend
    ports:
      - ${CONNECT_PORT}:8083
    environment:
      CONNECT_BOOTSTRAP_SERVERS: broker:${BROKER_INTERNAL_PORT}
      CONNECT_REST_PORT: 8083
      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_GROUP_ID: connect-distributed-group
      CONNECT_CONFIG_STORAGE_TOPIC: _kafka-connect-configs
      CONNECT_OFFSET_STORAGE_TOPIC: _kafka-connect-offsets
      CONNECT_STATUS_STORAGE_TOPIC: _kafka-connect-status
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: ${REPLICATION_FACTOR}
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: ${REPLICATION_FACTOR}
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: ${REPLICATION_FACTOR}
      CONNECT_OFFSET_FLUSH_INTERVAL_MS: 1000
      CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: '<http://schema-registry>:${SCHEMA_REGISTRY_PORT}'
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: <http://schema-registry>:${SCHEMA_REGISTRY_PORT}
      CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-${KAFKA_VERSION}.jar
      CONNECT_PRODUCER_INTERCEPTOR_CLASSES: io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor
      CONNECT_CONSUMER_INTERCEPTOR_CLASSES: io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor
      CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components
      CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR
      CONNECT_LOG4J_ROOT_LOGLEVEL: WARN
      CONNECT_TOOLS_LOG4J_LOGLEVEL: ERROR
    volumes:
      - $PWD/kafka-ce/connect/plugins:/usr/share/confluent-hub-components
      - $PWD/kafka-ce/connect/data:/data
    restart: always
  ####################
  # ksqldb-server
  ####################
  ksqldb-server:
    image: confluentinc/cp-ksqldb-server:${KAFKA_VERSION}
    hostname: ksqldb-server
    container_name: ksqldb-server
    depends_on:
      - zookeeper
      - broker
      - connect
    networks:
      - backend
    ports:
      - ${KSQLDB_PORT}:8088
    environment:
      KSQL_CONFIG_DIR: /etc/ksql
      KSQL_BOOTSTRAP_SERVERS: broker:${BROKER_INTERNAL_PORT}
      KSQL_HOST_NAME: ksqldb-server
      KSQL_LISTENERS: <http://0.0.0.0>:${KSQLDB_PORT}
      KSQL_CACHE_MAX_BYTES_BUFFERING: 0
      KSQL_KSQL_SCHEMA_REGISTRY_URL: <http://schema-registry>:${SCHEMA_REGISTRY_PORT}
      KSQL_PRODUCER_INTERCEPTOR_CLASSES: io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor
      KSQL_CONSUMER_INTERCEPTOR_CLASSES: io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor
      KSQL_KSQL_CONNECT_URL: <http://connect>:${CONNECT_PORT}
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_REPLICATION_FACTOR: ${REPLICATION_FACTOR}
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: true
      KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: true
      KSQL_LOG4J_ROOT_LOGLEVEL: WARN
      KSQL_TOOLS_LOG4J_LOGLEVEL: ERROR
    restart: always
  ####################
  # ksqldb-cli
  ####################
  ksqldb-cli:
    image: confluentinc/cp-ksqldb-cli:${KAFKA_VERSION}
    hostname: ksqldb-cli
    container_name: ksqldb-cli
    depends_on:
      - zookeeper
      - broker
      - connect
      - ksqldb-server
    networks:
      - backend
    entrypoint: /bin/sh
    tty: true
    volumes:
      - $PWD/kafka-ce/ksqldb-cli/scripts:/data/scripts
    restart: always    
  ####################
  # rest-proxy
  ####################
  rest-proxy:
    image: confluentinc/cp-kafka-rest:${KAFKA_VERSION}
    hostname: rest-proxy
    container_name: rest-proxy
    depends_on:
      - broker
      # - schema-registry
    networks:
      - backend
    ports:
      - ${REST_PROXY_PORT}:8082
    environment:
      KAFKA_REST_HOST_NAME: rest-proxy
      KAFKA_REST_BOOTSTRAP_SERVERS: 'broker:${BROKER_INTERNAL_PORT}'
      KAFKA_REST_LISTENERS: <http://0.0.0.0>:${REST_PROXY_PORT}
      KAFKA_REST_SCHEMA_REGISTRY_URL: <http://schema-registry>:${SCHEMA_REGISTRY_PORT}
      KAFKA_REST_LOG4J_ROOT_LOGLEVEL: WARN
      KAFKA_REST_TOOLS_LOG4J_LOGLEVEL: ERROR
    restart: always
    
  ####################
  # kafka-ui
  ####################
  kafka-ui:
    hostname: kafka-ui
    container_name: kafka-ui
    image: provectuslabs/kafka-ui:latest
    ports:
      - ${KAFKA_UI_PORT}:8080
    networks:
      - backend
    depends_on:
      - zookeeper
      - broker
      # - schema-registry
      # - connect
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: broker:${BROKER_INTERNAL_PORT}
      KAFKA_CLUSTERS_0_METRICS_PORT: ${KAFKA_UI_METRIC_PORT}
      KAFKA_CLUSTERS_0_SCHEMAREGISTRY: <http://schema-registry>:${SCHEMA_REGISTRY_PORT}
      KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME: connect
      KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS: <http://connect>:${CONNECT_PORT}
      KAFKA_CLUSTERS_0_KSQLDBSERVER: <http://ksqldb-server>:${KSQLDB_PORT}
      KAFKA_CLUSTERS_0_READONLY: ${KAFKA_UI_READONLY}
      AUTH_TYPE: ${KAFKA_UI_AUTH_TYPE}
      SPRING_SECURITY_USER_NAME: ${KAFKA_UI_SPRING_SECURITY_USER_NAME}
      SPRING_SECURITY_USER_PASSWORD: ${KAFKA_UI_SPRING_SECURITY_USER_PASSWORD}
    restart: always
  ####################
  # init-kafka
  ####################
  init-kafka:
    image: confluentinc/cp-kafka:${KAFKA_VERSION}
    container_name: init-kafka
    depends_on:
      - zookeeper
      - broker
      # - schema-registry
      # - connect
    networks:
      - backend
    entrypoint: [ '/bin/sh', '-c' ]
    command: |
      "
      kafka-topics --bootstrap-server broker:${BROKER_INTERNAL_PORT} --list
      echo -e 'Creating kafka topics'
      kafka-topics --bootstrap-server broker:${BROKER_INTERNAL_PORT} --create --if-not-exists --topic ecommerce_customers --replication-factor ${REPLICATION_FACTOR} --partitions ${PARTITIONS}
      kafka-topics --bootstrap-server broker:${BROKER_INTERNAL_PORT} --create --if-not-exists --topic ecommerce_products --replication-factor ${REPLICATION_FACTOR} --partitions ${PARTITIONS}
      kafka-topics --bootstrap-server broker:${BROKER_INTERNAL_PORT} --create --if-not-exists --topic ecommerce_transactions --replication-factor ${REPLICATION_FACTOR} --partitions ${PARTITIONS}
      kafka-topics --bootstrap-server broker:${BROKER_INTERNAL_PORT} --create --if-not-exists --topic ecommerce_user_interactions --replication-factor ${REPLICATION_FACTOR} --partitions ${PARTITIONS}
      kafka-topics --bootstrap-server broker:${BROKER_INTERNAL_PORT} --create --if-not-exists --topic ecommerce_product_views --replication-factor ${REPLICATION_FACTOR} --partitions ${PARTITIONS}
      kafka-topics --bootstrap-server broker:${BROKER_INTERNAL_PORT} --create --if-not-exists --topic ecommerce_system_logs --replication-factor ${REPLICATION_FACTOR} --partitions ${PARTITIONS}
      echo -e 'Successfully created the following topics:'
      kafka-topics --bootstrap-server broker:${BROKER_INTERNAL_PORT} --list
      "
################################################################################
#
# networks
# - backend
#
################################################################################
networks:
  backend:
    name: backend
```

```
#!/bin/bash

sudo rm -r esdata1
sudo rm -r kafka-ce
sudo rm -r spark-processing
sudo rm -r spark-checkpoint

# Function to create volumes for various services
create_volumes() {
    service=$1
    shift
    echo "Creating volumes for ${service} ..."
    for item in "$@"
    do
        echo "$item"
        mkdir -p "$item"
        sudo chown -R "$(id -u)" "$item"
        sudo chgrp -R "$(id -g)" "$item"
        sudo chmod -R u+rwX,g+rX,o+wrx "$item"
        echo "$item volume is created."
    done
    echo "Volumes for ${service} created ✅"
    echo
}
# Load environment variables from .env file
source .env
# Create volumes for different services
create_volumes zookeeper kafka-ce/zk/data kafka-ce/zk/txn-logs
create_volumes brokers kafka-ce/broker/data kafka-ce/broker2/data kafka-ce/broker3/data kafka-ce/broker4/data
create_volumes schema-registry kafka-ce/schema-registry/data
create_volumes connect kafka-ce/connect/data kafka-ce/connect/plugins
create_volumes ksqldb-cli kafka-ce/ksqldb-cli/scripts
create_volumes filepulse kafka-ce/connect/data/filepulse/xml
create_volumes elasticsearch esdata1
create_volumes spark_master spark-processing
create_volumes spark_master spark-checkpoint
cp spark-processing.py spark-processing
export PWD=$(pwd)
# Start all services using Docker Compose
echo "Starting all services ..."
docker compose -f docker-compose.yaml up -d
# Set timeout for readiness checks
timeout=600
echo ''
# Check readiness for Zookeeper
zookeeper="zookeeper:${ZOOKEEPER_CLIENT_PORT}"
echo "Wait for ${zookeeper} ..."
docker exec -it zookeeper cub zk-ready "$zookeeper" $timeout > /dev/null
echo "${zookeeper} is ready ✅"
echo ''
# Check readiness for Kafka brokers
for item in broker:${BROKER_INTERNAL_PORT} broker2:${BROKER2_INTERNAL_PORT}
do
    broker="$item"
    echo "Wait for ${broker} ..."
    docker exec -it zookeeper cub kafka-ready -b "$broker" 1 $timeout > /dev/null
    echo "${broker} is ready ✅"
    echo ''
done
# Check readiness for Schema Registry
schema_registry_host="schema-registry"
schema_registry_port="${SCHEMA_REGISTRY_PORT}"
echo "Wait for ${schema_registry_host}:${schema_registry_port} ..."
docker exec -it zookeeper cub sr-ready "$schema_registry_host" $schema_registry_port $timeout > /dev/null
echo "${schema_registry_host}:${schema_registry_port} is ready ✅"
echo ''
# Check readiness for Kafka Connect
# for item in connect connect2 connect3
for item in connect
do
    connect_host="$item"
    connect_port="${CONNECT_PORT}"
    echo "Wait for ${connect_host}:${connect_port} ..."
    docker exec -it zookeeper cub connect-ready "$connect_host" $connect_port $timeout > /dev/null
    echo "${connect_host}:${connect_port} is ready ✅"
    echo ''
done
echo "Kafka cluster is ready ✅"
```