# Intro
A broken up version of [main.py](./main.py) that shows how put it together and alternatives explored.

1. Created in Kafka Python from an Aiven exmaple, found this was too old so rebuilt it using the;
1. Confluent Python, kafka-python is incompatible or poorly maintained
1. Also see the Quix consumer example

# Confluent Python
The OSS Kafka Python library is unfortunately stale (public release 2020) and not great, so it's best to use Confluent's based on librdkafka.

[Docs](https://docs.confluent.io/kafka-clients/python/current/overview.html), and [Github](https://github.com/confluentinc/confluent-kafka-python).

## A note on resetting offsets

I was getting confused with `auto.offset.reset=earliest` not causing a reset. This is because this property does not cause a reset! It is a common source of confusion. The property is for when a new, i.e. an unknown consumer group appears to Kafka, such as the first time you start comitting offsets. It is to define behaviour when you start comitting.

If instead you would like to re-read everything , as I did in development here, you should stop the app and reset offsets (using a tool such as [Conduktor](https://conduktor.io)).

In [None]:
pip install confluent-kafka

In [None]:
bootstrap_servers = "localhost:19092,"
topics = ["weather_data_demo"]
client_id = "myClientId"
consumer_group = "stuCG"
offset_config = "earliest"

In [None]:
# -- shell script to reset offsets --
# if not using a tool
kafka-consumer-groups.sh --bootstrap-server localhost:19092 --group stuCG --topic weather_data_demo --reset-offsets --to-earliest --execute

In [None]:
# -- Creating A Consumer ---

from confluent_kafka import DeserializingConsumer
import json
import sys

def json_serializer(msg, s_obj=None):
    # return json.loads(msg.decode('ascii')) # alternative data type
    return json.loads(msg)

conf = {
    'bootstrap.servers': bootstrap_servers,
    'client.id': client_id,
    'group.id': consumer_group,
    # 'security.protocol': 'SSL',
    # 'ssl.ca.location': '../sslcerts/ca.pem',
    # 'ssl.certificate.location': '../sslcerts/service.cert',
    # 'ssl.key.location': '../sslcerts/service.key', 
    # 'key.deserializer': json_serializer, # if key in JSON
    'value.deserializer': json_serializer,
    'auto.offset.reset': offset_config,
    }

consumer = DeserializingConsumer(conf)


# --- Running the consumer ---

running = True

try:
    consumer.subscribe(topics)
    print(f"Subscribed to topics: {topics}")

    while running:
        msg = consumer.poll(timeout=1.0)
        if msg is None: 
            print("Waiting for message...")
            continue

        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                # End of partition event
                sys.stderr.write('%% %s [%d] reached end at offset %d\n' %
                                 (msg.topic(), msg.partition(), msg.offset()))
            elif msg.error():
                raise KafkaException(msg.error())
        else:
             print(f"{msg.partition()}:{msg.offset()}: "
                  f"k={msg.key()} "
                  f"v={msg.value()}")
finally:
    # Close down consumer to commit final offsets.
    consumer.close()    

In [None]:
# --- Running the consumer ---

running = True

try:
    consumer.subscribe(["weather_data_demo"])

    while running:
        msg = consumer.poll(timeout=1.0)
        if msg is None: continue

        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                # End of partition event
                sys.stderr.write('%% %s [%d] reached end at offset %d\n' %
                                 (msg.topic(), msg.partition(), msg.offset()))
            elif msg.error():
                raise KafkaException(msg.error())
        else:
            print ("%d:%d: k=%s v=%s" % (
                msg.partition(),
                msg.offset(),
                msg.key(),
                msg.value()))
finally:
    # Close down consumer to commit final offsets.
    consumer.close()    

# Kafka Python
From [kafka-python](https://github.com/dpkp/kafka-python).

Tried to get this working, think it is failing on the metadata API so it can't discover the broker on this newer version of Kafka.
Will get a `NoBrokersAvailable` error so moved to the Confluent python libray featured above.

In [None]:
pip install kafka-python

In [None]:
from kafka import KafkaAdminClient
from kafka.admin import NewTopic
import json
import sys

In [None]:
hostname = "localhost"
port = "19092"
topics = ["weather_data_demo"]

In [None]:
# Create the consumer
from kafka import KafkaConsumer

'''
I think there's an issue with the metadata API call and the `kafka` library.
No brokers available I think it's failing on that.
'''

def json_serialiser(msg, s_obj):
    return json.loads(msg.decode('ascii'))

config = {
    'bootstrap_servers': hostname+":"+port,
    'client_id': 'myClient',
    'group_id': 'ConsumerAlpha',
    # 'security.protocol': 'SSL',
    # 'ssl.ca.location': '../sslcerts/ca.pem',
    # 'ssl.certificate.location': '../sslcerts/service.cert',
    # 'ssl.key.location': '../sslcerts/service.key', 
    'value_deserializer': json_serialiser,
    'key_deserializer': json_serialiser
}

# consumer = KafkaConsumer(config)
c2 = KafkaConsumer("weather_data_demo")

In [None]:
running = True

In [None]:
# Running the consumer

try:
    consumer.subscribe((topics))

    while running:
        msg = consumer.poll(timeout=1.0)
        if msg is None: 
            print("Waiting for message")
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                # End of partition event
                sys.stderr.write('%% %s [%d] reached end at offset %d\n' %
                                 (msg.topic(), msg.partition(), msg.offset()))
            elif msg.error():
                raise KafkaException(msg.error())
        else:
            # print ("%d:%d: k=%s v=%s" % (
            #     msg.partition(),
            #     msg.offset(),
            #     msg.key(),
            #     msg.value()))
            # test the udpate to f-strings
            print(f"{msg.partition()}:{msg.offset()}: "
                  f"k={msg.key()} "
                  f"v={msg.value()}")

finally:
    # Close down consumer to commit final offsets.
    consumer.close()   

# Quix Consumer
Code is an adapted version from [simple-kafka-python](https://github.com/quixio/simple-kafka-python/tree/main).

In [None]:
from quixstreams import Application
import json
import time
# Application top level object

app = Application (
    broker_address = "localhost:19092",
    loglevel = "DEBUG",
    consumer_group = "stu_consumer_group",
    auto_offset_reset = "earliest"
)


max_records = 1000  # Maximum number of records to process
start_time = time.time()  # Start time for elapsed time measurement
max_duration = 100 # max running time

records_processed = 0

with app.get_consumer() as consumer:
    consumer.subscribe(["customers"]) # subscribing to a topic

    while True:
        msg = consumer.poll(1)
        # breakpoint() # add for debugging
        if msg is None:
            print("Waiting for message...")
        elif msg.error() is not None:
            raise Exception(msg.error())
        # else:
        #     breakpoint() # add for debugging
        else:
            key = msg.key().decode('utf8')
            value = json.loads(msg.value())
            offset = msg.offset()

            print(f"{offset} {'|'} {key} {'|'} {value} {'|'}")
            records_processed += 1

        if records_processed >= max_records or time.time() - start_time >= max_duration:
                print("Maximum records or time limit reached. Exiting...")
                break
