diff --git a/supporting-blog-content/elasticsearch-through-apache-kafka/.gitignore b/supporting-blog-content/elasticsearch-through-apache-kafka/.gitignore new file mode 100644 index 00000000..1eadfc7b --- /dev/null +++ b/supporting-blog-content/elasticsearch-through-apache-kafka/.gitignore @@ -0,0 +1,2 @@ +/docker/elasticsearch/ +/elasticsearch/ diff --git a/supporting-blog-content/elasticsearch-through-apache-kafka/README.md b/supporting-blog-content/elasticsearch-through-apache-kafka/README.md new file mode 100644 index 00000000..ffb7567e --- /dev/null +++ b/supporting-blog-content/elasticsearch-through-apache-kafka/README.md @@ -0,0 +1,43 @@ +# Data Ingestion with Apache Kafka and Elasticsearch + +This project demonstrates a data ingestion pipeline using **Apache Kafka** and **Elasticsearch** with **Python**. Messages are produced and consumed through Kafka, indexed in Elasticsearch, and visualized in Kibana. + +## Project Structure + +The infrastructure is managed with **Docker Compose**, which starts the following services: + +- **Zookeeper**: Manages and coordinates the Kafka brokers. +- **Kafka**: Responsible for distributing and storing messages. +- **Elasticsearch**: Stores and indexes the messages for analysis. +- **Kibana**: Visualization interface for data stored in Elasticsearch. + +The **Producer** code sends messages to Kafka, while the **Consumer** reads and indexes these messages in Elasticsearch. + +--- + +## Prerequisites + +- **Docker and Docker Compose**: Ensure you have Docker and Docker Compose installed on your machine. +- **Python 3.x**: To run the Producer and Consumer scripts. + +--- + +## Configure the Producer and Consumer + +### Producer +The producer.py sends messages to the logs topic in Kafka in batches. +It uses the batch_size and linger_ms settings to optimize message sending. +```` +python producer.py +```` + +### Consumer +The consumer.py reads messages from the logs topic and indexes them in Elasticsearch. It consumes messages in batches and automatically commits the processing of messages. + +```` +python consumer.py +```` + +## Data Verification in Kibana +After running the producer.py and consumer.py scripts, access Kibana at http://localhost:5601 to visualize the indexed data. Messages sent by the producer and processed by the consumer will be in the Elasticsearch index. + diff --git a/supporting-blog-content/elasticsearch-through-apache-kafka/docker-compose.yml b/supporting-blog-content/elasticsearch-through-apache-kafka/docker-compose.yml new file mode 100644 index 00000000..710e8a06 --- /dev/null +++ b/supporting-blog-content/elasticsearch-through-apache-kafka/docker-compose.yml @@ -0,0 +1,89 @@ +version: "3" + +services: + + zookeeper: + image: confluentinc/cp-zookeeper:latest + container_name: zookeeper + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + + kafka: + image: confluentinc/cp-kafka:latest + container_name: kafka + depends_on: + - zookeeper + ports: + - "9092:9092" + - "9094:9094" + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + + kafka-connect: + image: confluentinc/cp-kafka-connect-base:6.0.0 + container_name: kafka-connect + platform: linux/amd64 + depends_on: + - zookeeper + - kafka + ports: + - 8083:8083 + environment: + CONNECT_BOOTSTRAP_SERVERS: "kafka:29092" + CONNECT_REST_PORT: 8083 + CONNECT_GROUP_ID: kafka-connect + CONNECT_CONFIG_STORAGE_TOPIC: _connect-configs + CONNECT_OFFSET_STORAGE_TOPIC: _connect-offsets + CONNECT_STATUS_STORAGE_TOPIC: _connect-status + CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter + CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter + CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter" + CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter" + CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect" + CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO" + CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR" + CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN: "[%d] %p %X{connector.context}%m (%c:%L)%n" + CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1" + CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1" + CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1" + CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components,/data/connect-jars + volumes: + - $PWD/data:/data + command: + - bash + - -c + - | + echo "Installing Connector" + confluent-hub install --no-prompt confluentinc/kafka-connect-elasticsearch:10.0.1 + # + echo "Launching Kafka Connect worker" + /etc/confluent/docker/run & + # + sleep infinity + + elasticsearch: + image: docker.elastic.co/elasticsearch/elasticsearch:8.15.1 + container_name: elasticsearch-8.15.1 + environment: + - node.name=elasticsearch + - xpack.security.enabled=false + - discovery.type=single-node + - "ES_JAVA_OPTS=-Xms512m -Xmx512m" + volumes: + - ./elasticsearch:/usr/share/elasticsearch/data + ports: + - 9200:9200 + + kibana: + image: docker.elastic.co/kibana/kibana:8.15.1 + container_name: kibana-8.15.1 + ports: + - 5601:5601 + environment: + ELASTICSEARCH_URL: http://elasticsearch:9200 + ELASTICSEARCH_HOSTS: '["http://elasticsearch:9200"]' diff --git a/supporting-blog-content/elasticsearch-through-apache-kafka/kafka-connect/es.properties b/supporting-blog-content/elasticsearch-through-apache-kafka/kafka-connect/es.properties new file mode 100644 index 00000000..a671d56e --- /dev/null +++ b/supporting-blog-content/elasticsearch-through-apache-kafka/kafka-connect/es.properties @@ -0,0 +1,9 @@ +name=elasticsearch-sink-connector +topics=logs +connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector +connection.url=http://localhost:9200 +type.name=_doc +value.converter=org.apache.kafka.connect.json.JsonConverter +value.converter.schemas.enable=false +schema.ignore=true +key.ignore=true \ No newline at end of file diff --git a/supporting-blog-content/elasticsearch-through-apache-kafka/kafka_consumer.py b/supporting-blog-content/elasticsearch-through-apache-kafka/kafka_consumer.py new file mode 100644 index 00000000..4847c1c2 --- /dev/null +++ b/supporting-blog-content/elasticsearch-through-apache-kafka/kafka_consumer.py @@ -0,0 +1,49 @@ +from kafka import KafkaConsumer +from elasticsearch import Elasticsearch, helpers +from datetime import datetime +import json + +es = Elasticsearch(["http://localhost:9200"]) + +consumer = KafkaConsumer( + "logs", # Topic name + bootstrap_servers=["localhost:9092"], + auto_offset_reset="latest", # Ensures reading from the latest offset if the group has no offset stored + enable_auto_commit=True, # Automatically commits the offset after processing + group_id="log_consumer_group", # Specifies the consumer group to manage offset tracking + max_poll_records=10, # Maximum number of messages per batch + fetch_max_wait_ms=2000, # Maximum wait time to form a batch (in ms) +) + + +def create_bulk_actions(logs): + for log in logs: + yield { + "_index": "logs", + "_source": { + "level": log["level"], + "message": log["message"], + "timestamp": log["timestamp"], + }, + } + + +if __name__ == "__main__": + try: + print("Starting message consumption...") + while True: + + messages = consumer.poll(timeout_ms=1000) + + # process each batch messages + for _, records in messages.items(): + logs = [json.loads(record.value) for record in records] + # print(logs) + bulk_actions = create_bulk_actions(logs) + response = helpers.bulk(es, bulk_actions) + print(f"Indexed {response[0]} logs.") + except Exception as e: + print(f"Error: {e}") + finally: + consumer.close() + print(f"Finish") diff --git a/supporting-blog-content/elasticsearch-through-apache-kafka/kafka_producer.py b/supporting-blog-content/elasticsearch-through-apache-kafka/kafka_producer.py new file mode 100644 index 00000000..ab69f152 --- /dev/null +++ b/supporting-blog-content/elasticsearch-through-apache-kafka/kafka_producer.py @@ -0,0 +1,62 @@ +from datetime import datetime + +from kafka import KafkaProducer +import json +import time +import logging +import random + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger("log_producer") + +producer = KafkaProducer( + bootstrap_servers=["localhost:9092"], # Specifies the Kafka server to connect + value_serializer=lambda x: json.dumps(x).encode( + "utf-8" + ), # Serializes data as JSON and encodes it to UTF-8 before sending + batch_size=16384, # Sets the maximum batch size in bytes (here, 16 KB) for buffered messages before sending + linger_ms=10, # Sets the maximum delay (in milliseconds) before sending the batch + acks="all", # Specifies acknowledgment level; 'all' ensures message durability by waiting for all replicas to acknowledge +) + + +def generate_log_message(): + + diff_seconds = random.uniform(300, 600) + timestamp = time.time() - diff_seconds + + log_messages = { + "INFO": [ + "User login successful", + "Database connection established", + "Service started", + "Payment processed", + ], + "WARNING": ["Service stopped", "Payment may not have been processed"], + "ERROR": ["User login failed", "Database connection failed", "Payment failed"], + "DEBUG": ["Debugging user login flow", "Debugging database connection"], + } + + level = random.choice(list(log_messages.keys())) + + message = random.choice(log_messages[level]) + + log_entry = {"level": level, "message": message, "timestamp": timestamp} + + return log_entry + + +def send_log_batches(topic, num_batches=5, batch_size=10): + for i in range(num_batches): + logger.info(f"Sending batch {i + 1}/{num_batches}") + for _ in range(batch_size): + log_message = generate_log_message() + producer.send(topic, value=log_message) + producer.flush() + time.sleep(1) + + +if __name__ == "__main__": + topic = "logs" + send_log_batches(topic) + producer.close() diff --git a/supporting-blog-content/elasticsearch-through-apache-kafka/requirements.txt b/supporting-blog-content/elasticsearch-through-apache-kafka/requirements.txt new file mode 100644 index 00000000..dae7c024 --- /dev/null +++ b/supporting-blog-content/elasticsearch-through-apache-kafka/requirements.txt @@ -0,0 +1,2 @@ +kafka-python==2.0.2 +elasticsearch==7.10.0 \ No newline at end of file