Installs

In [None]:
!pip3 install confluent-kafka

Imports

In [3]:
import json
import uuid
from os import getenv
import confluent_kafka

### Globals

In [8]:
KAFKA_TOPIC_NAME = getenv('KAFKA_TOPIC')

### Consumer Functions

In [9]:
def connect_to_kafka_consumer() -> confluent_kafka.Consumer:
    bootstrap_servers=getenv('KAFKA_SERVER')
    security_protocol='SASL_SSL'
    sasl_mechanisms='PLAIN'
    sasl_username=getenv('KAFKA_USERNAME')
    sasl_password=getenv('KAFKA_PASSWORD')

    c = confluent_kafka.Consumer({
        'bootstrap.servers': bootstrap_servers,
        'group.id': f'deloton-group-three' +str(uuid.uuid1()),
        'security.protocol': 'SASL_SSL',
        'sasl.mechanisms': 'PLAIN',
        'sasl.username': sasl_username,
        'sasl.password': sasl_password,
        'session.timeout.ms': 6000,
        'heartbeat.interval.ms': 1000,
        'fetch.wait.max.ms': 6000,
        'auto.offset.reset': 'earliest',
        'enable.auto.commit': 'false',
        'max.poll.interval.ms': '86400000',
        'topic.metadata.refresh.interval.ms': "-1",
        "client.id": 'id-002-005',
    })

    return c


def stream_kafka_topic(c:confluent_kafka.Consumer, topic: str, number_of_logs: int) -> list:
    """
    Streams a predefined number of logs using the provided kafka consumer and topic
    Returns a list of the logs
    """
    c.subscribe([topic])
    data = []
    try:
        while len(data) <= number_of_logs:
            log = c.poll(1.0)
            if log == None:
                data.append('No message available')
            else: 
                key = log.key().decode('utf-8')
                value = json.loads(log.value().decode('utf-8'))
                topic = log.topic()
                data.append(value)
        return data
    except KeyboardInterrupt:
        pass
    finally:
        c.close()

### Connect to Consumer

In [5]:
consumer = connect_to_kafka_consumer()

### Stream Data

In [None]:
data = stream_kafka_topic(consumer, KAFKA_TOPIC_NAME, 20)

In [6]:
data

['No message available',
 {'log': '--------- beginning of main\n'},
 {'log': '--------- beginning of a new ride\n'},
 {'log': '2022-07-06 23:29:35.435798 mendoza v9: Getting user data from server..\n'},
 {'log': '2022-07-06 23:29:35.935822 mendoza v9: [SYSTEM] data = {"user_id":100,"name":"Steven Pearce","gender":"male","address":"0 Simpson locks,Wilkinsonshire,E68 8SH","date_of_birth":273715200000,"email_address":"steven.p@gmail.com","height_cm":167,"weight_kg":83,"account_create_date":1647993600000,"bike_serial":"SN0000","original_source":"direct"}\n'},
 {'log': '2022-07-06 23:29:36.435842 mendoza v9: [INFO]: Ride - duration = 1.0; resistance = 0\n'},
 {'log': '2022-07-06 23:29:36.935860 mendoza v9: [INFO]: Telemetry - hrt = 0; rpm = 0; power = 0.0\n'},
 {'log': '2022-07-06 23:29:37.435875 mendoza v9: [INFO]: Ride - duration = 2.0; resistance = 0\n'},
 {'log': '2022-07-06 23:29:37.935885 mendoza v9: [INFO]: Telemetry - hrt = 0; rpm = 0; power = 0.0\n'},
 {'log': '2022-07-06 23:29:38.