In [2]:
# import statements
from time import sleep
from kafka import KafkaConsumer
from kafka import KafkaProducer
import time 
import json

topic = 'input_topic'

# connecting to kafka consumer
def connect_kafka_consumer():
    _consumer = None
    try:
         _consumer = KafkaConsumer(topic,
                                   consumer_timeout_ms=10000,
                                   auto_offset_reset='earliest',
                                   bootstrap_servers=['localhost:9092'],
                                   api_version=(0, 10))
    except Exception as ex:
        print('Exception while connecting Kafka')
        print(str(ex))
    finally:
        return _consumer

# connecting to kafka producer
def connect_kafka_producer():
    _producer = None
    try:
        _producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                                  api_version=(0,10))
    except Exception as ex:
        print('Exception while connecting Kafka.')
        print(str(ex))
    finally:
        return _producer
    
# publish aggregated payload to output_topic
def publish_to_output_topic(producer_instance, output, key):
    try:
        key_bytes = bytes(key, encoding='utf-8')
        value_bytes = bytes(output, encoding='utf-8')
        producer_instance.send('output_topic', key=key_bytes, value=value_bytes)
        producer_instance.flush()
        print('Message published successfully. Data: ' + str(output))
    except Exception as ex:
        print('Exception in publishing message.')
        print(str(ex))


# consume the message and publish it to output_topic
def consume_messages(consumer):
    user_messages = {}
    try:
        for message in consumer:
            data = json.loads(message.value.decode('utf-8'))
            # aggregate recieved messages on userIds and update the lastSeen if recent message found
            if data['userId'] in user_messages:
                user_messages[data['userId']]['lastSeen'] = data['metadata']['sentAt']
            else:
                user_messages[data['userId']] = {
                'userId': data['userId'],
                'firstSeen': data['metadata']['sentAt'],
                'lastSeen': data['metadata']['sentAt']
            }
            producer = connect_kafka_producer()
            payload_data = json.dumps(user_messages[data['userId']])
            publish_to_output_topic(producer, payload_data, data['userId'])
    except Exception as ex:
        print(str(ex))
    
    
if __name__ == '__main__':
    consumer = connect_kafka_consumer()
    consume_messages(consumer)
    
    

Message published successfully. Data: {"userId": "j11223344", "firstSeen": 1631706818, "lastSeen": 1631706818}
Message published successfully. Data: {"userId": "j87654321", "firstSeen": 1631706823, "lastSeen": 1631706823}
Message published successfully. Data: {"userId": "j12345678", "firstSeen": 1631706828, "lastSeen": 1631706828}
Message published successfully. Data: {"userId": "j11223344", "firstSeen": 1631706818, "lastSeen": 1631706833}
Message published successfully. Data: {"userId": "j87654321", "firstSeen": 1631706823, "lastSeen": 1631706838}
Message published successfully. Data: {"userId": "j44332211", "firstSeen": 1631706843, "lastSeen": 1631706843}
Message published successfully. Data: {"userId": "j11223344", "firstSeen": 1631706818, "lastSeen": 1631706848}


KeyboardInterrupt: 