In [1]:
import json
import logging
import time
import logging.config
from configparser import ConfigParser
import argparse
import os
import subprocess

from confluent_kafka import Producer, KafkaError, Consumer
from confluent_kafka.admin import AdminClient, NewTopic

# Kafka Producer

In [2]:
logger = logging.getLogger(__name__)

class ProducerServer:
    """
    Basic Kafka consumer class
    """

    def __init__(self, conf, time_interval):
        self.conf = conf
        self.topic = self.conf.get("kafka", "topic")
        self.input_file = self.conf.get("kafka", "input_file")
        self.bootstrap_servers = self.conf.get("kafka", "bootstrap_servers")
        self.num_partitions = self.conf.getint("kafka", "num_partitions")
        self.replication_factor = self.conf.getint("kafka", "replication_factor")
        self.progress_interval = self.conf.getint("kafka", "progress_interval")
        self.admin_client = AdminClient({"bootstrap.servers": self.bootstrap_servers})
        self.producer = Producer({"bootstrap.servers": self.bootstrap_servers})
        self.time_interval = time_interval

    def create_topic(self):
        """
        Check if Kafka topic already exists. If not, create it, else continue
        """
        if self.topic not in self.admin_client.list_topics().topics:
            futures = self.admin_client.create_topics([NewTopic(topic=self.topic,
                                                                num_partitions=self.num_partitions,
                                                                replication_factor=self.replication_factor)])

            for _topic, future in futures.items():
                try:
                    future.result()
                    logger.info(f"Created topic: {_topic}")
                except KafkaError as err:
                    logger.critical(f"Failed to create topic {_topic}: {err}")
        else:
            logger.info(f"Topic {self.topic} already exists")

    def generate_data(self):
        """
        Read input JSON file from disk and produce individual serialized rows to Kafka
        """
        with open(self.input_file, "r", encoding="utf8") as f:
            line_count = 0
            for line in f:
                data = json.loads(line)

                # trigger delivery report callbacks from previous produce calls
                self.producer.poll(timeout=2)

                # serialize Python dict to string
                msg = self.serialize_json(data)
                logger.debug(f"Serialized JSON data:\n {msg}")

                # send data to Kafka
                self.producer.produce(topic=self.topic, value=msg, callback=self.delivery_callback)

                # log progress
                line_count = 1
                if line_count % self.progress_interval == 0:
                    logger.debug(f"Processed {line_count} rows of data")

                # wait 2 second before reading next line
                time.sleep(self.time_interval)

            # make sure all messages are delivered before closing producer
            logger.debug("Flushing producer")
            self.producer.flush()

    @staticmethod
    def serialize_json(json_data):
        """
        Serialize Python dict to JSON-formatted, UTF-8 encoded string
        """
        return json.dumps(json_data).encode("utf-8")

    @staticmethod
    def delivery_callback(err, msg):
        """
        Callback triggered by produce function
        """
        if err is not None:
            logger.error(f"Failed to deliver message: {err}")
        else:
            logger.info(f"Successfully produced message to topic {msg.topic()}")
            print(f"---")

    def close(self):
        """
        Convenience method to flush producer
        """
        logger.debug("Flushing producer")
        self.producer.flush()

In [3]:
def run_cmd(args_list):
    """
    run linux commands
    """
    # import subprocess
    # print('Running system command: {0}'.format(' '.join(args_list)))
    proc = subprocess.Popen(args_list, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    s_output, s_err = proc.communicate()
    s_return =  proc.returncode
    
    if s_return != 0:
        return s_return, s_output, s_err
    else:
        return "Command executed successfully!"

def run_kafka_producer(time_interval):
    """
    Create Kafka producer, check if relevant topic exists (if not create it) and start producing messages
    """

    # load config
    config = ConfigParser()
    config.read(os.path.join(cur_path, "test.cfg"))

    # start kafka producer
    logger.info("Starting Kafka Producer")
    producer = ProducerServer(config, time_interval)

    # check if topic exists
    logger.info("Creating topic...")
    producer.create_topic()

    # generate data
    logger.info("Starting to generate data...")

    try:
        producer.generate_data()
    except KeyboardInterrupt:
        logging.info("Stopping Kafka Producer")
        producer.close()

if __name__ == "__main__":
    cur_path = os.getcwd()

# make sure logging config is picked up by modules
    logging.config.fileConfig(os.path.join(cur_path, "logging.ini"))    
    # start logging
    logger = logging.getLogger(__name__)

    # Clear topic
    clear_topic_res = run_cmd(["kafka-topics", "--zookeeper", "localhost:2181", "--delete", "--topic", "streaming_data"])
    logger.info(clear_topic_res)

    time_interval = 2
    run_kafka_producer(time_interval)

%3|1646984592.838|FAIL|rdkafka#producer-1| [thrd:172.25.0.12:9092/bootstrap]: 172.25.0.12:9092/bootstrap: Connect to ipv4#172.25.0.12:9092 failed: Operation timed out (after 75001ms in state CONNECT)
%3|1646984592.838|FAIL|rdkafka#producer-2| [thrd:172.25.0.12:9092/bootstrap]: 172.25.0.12:9092/bootstrap: Connect to ipv4#172.25.0.12:9092 failed: Operation timed out (after 75000ms in state CONNECT)


# Kafka Consumer

In [5]:
def run_kafka_consumer():
    """
    Create Kafka consumer, subscribe to relevant topic and start consuming messages
    """

    # load config
    config = ConfigParser()
    config.read(os.path.join(cur_path, "test.cfg"))

    # start kafka consumer
    logger.info("Starting Kafka Consumer")
    consumer = Consumer({
        "bootstrap.servers": config.get("kafka", "bootstrap_servers"),
        "group.id": config.get("kafka", "group_id"),
        "auto.offset.reset": config.get("kafka", "auto_offset_reset")
    })

    # subscribe to topic
    consumer.subscribe(topics=[config.get("kafka", "topic")])

    # consume messages
    try:
        while True:
            msg = consumer.poll(timeout=2.0)

            if msg is None:
                logging.debug("No message received")
                continue
            if msg.error():
                logging.error(f"Consumer error: {msg.error()}")
                continue
            else:
                logging.info(f"Received message: {msg.value().decode('utf-8')}")

    except KeyboardInterrupt:
        logging.info("Stopping Kafka consumer")
        consumer.close()

In [6]:
run_kafka_consumer()

2022-03-11 14:17:52 - DEBUG - No message received
2022-03-11 14:17:53 - INFO - Received message: {"ts": 1538352011000, "userId": "293", "sessionId": 292, "page": "NextSong", "auth": "Logged In", "method": "PUT", "status": 200, "level": "free", "itemInSession": 20, "location": "Corpus Christi, TX", "userAgent": "\"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.125 Safari/537.36\"", "lastName": "Morales", "firstName": "Joseph", "registration": 1532063507000, "gender": "M", "artist": "Martin Orford", "song": "Grand Designs", "length": 597.55057}
2022-03-11 14:17:53 - INFO - Received message: {"ts": 1538352025000, "userId": "98", "sessionId": 97, "page": "NextSong", "auth": "Logged In", "method": "PUT", "status": 200, "level": "free", "itemInSession": 74, "location": "Houston-The Woodlands-Sugar Land, TX", "userAgent": "\"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/