In [None]:
from kafka import KafkaConsumer
import json
kafka_server_ip = "172.17.249.145"


In [None]:
class KafkaConsumerClient():
    
    def create_consumer(self, topic_name):
        '''
        func to create consumer without serializer 
        '''
        
        # kafka consumer
        self.consumer = KafkaConsumer(topic_name,
                                      bootstrap_servers=[f'{kafka_server_ip}:9092'],
                                      auto_offset_reset='earliest',
                                      enable_auto_commit=True,
                                      auto_commit_interval_ms =  5000,
                                      fetch_max_bytes = 128,
                                      max_poll_records = 100,
                                     )
        
    def create_consumer_json(self, topic_name):
        '''
        func to create consumer with json serializer'''
        
        # Define Kafka consumer
        self.consumer = KafkaConsumer(topic_name,   
                                      bootstrap_servers=[f'{kafka_server_ip}:9092'],
                                      auto_offset_reset='earliest',
                                      enable_auto_commit=True,
                                      auto_commit_interval_ms =  5000,
                                      fetch_max_bytes = 128,
                                      max_poll_records = 100,
                                      value_deserializer=lambda m: json.loads(m.decode('utf-8')),
                                      key_deserializer=lambda m: m.decode('utf-8'),
                                      security_protocol="PLAINTEXT")
              
    
    def kafka_consumer(self):
        '''
        Func to publish data streams
        to kafka cluster
        '''
        
        for message in self.consumer:
            print(message.value)
            
    def kafka_consumer_with_timeout(self, minutes=2):
        
        # start consuming messages
        try:
            while True:
                messages = self.consumer.poll(timeout_ms = 1000*60*minutes) # wait for messages
                if not messages:
                    raise TimeoutError(f"No messages received in the last {minutes} minutes")

                for _, message in messages.items():
                    # process message
                    print(message[0].value)

        except KeyboardInterrupt:
            pass

        finally:
            self.consumer.close()
        

In [None]:
kafkaconsumer_client = KafkaConsumerClient()
# create consumer without serializer
kafkaconsumer_client.create_consumer("topic1")
# consumer without polling
kafkaconsumer_client.kafka_consumer()

In [None]:
kafkaconsumer_client.consumer.close()

In [None]:
kafkaconsumer_client = KafkaConsumerClient()
# create consumer without serializer
kafkaconsumer_client.create_consumer("topic1")
# consumer with polling timeout
kafkaconsumer_client.kafka_consumer_with_timeout(0.5)

In [None]:
kafkaconsumer_client = KafkaConsumerClient()
# create consumer with serializer
kafkaconsumer_client.create_consumer_json("topic2")
# consumer with polling timeout
kafkaconsumer_client.kafka_consumer_with_timeout(0.5)