# Kafka Consumer

Using the `kafka-python` module, Kafka consumers can be instantiated via the `KafkaConsumer` class:

```python
#--- A TYPICAL CONSUMER
consumer = KafkaConsumer(
    bootstrap_servers=['62.30.10.23:9092'],  #<<<--- list of brokers
    security_protocol="SSL",                 #<<<--- security protocol (if any) 
    ssl_cafile="./ca.pem",                   #<<<--- certificate details (if any)
    ssl_certfile="./service.cert",           #           ...
    ssl_keyfile="./service.key",             #           ...
    value_deserializer=msgpack.unpackb,      #<<<--- message value deserialization function (e.g. unpack the message from a specific format)
    auto_offset_reset='earliest',            #<<<--- automatically bring the reading offset to the earliest message
    group_id="group_A",                      #<<<--- identify this consumer as part of group_A
)
```

Once more, we'll use a simple implementation of the consumer, with no specific configurations used in this example.

In [1]:
# define the list of brokers in the cluster
KAFKA_BOOTSTRAP_SERVERS = ['kafka-broker:9092']

In [2]:
from kafka import KafkaConsumer

# create a Kafka consumer instance
consumer = KafkaConsumer(
    bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,  # list of Kafka brokers
    consumer_timeout_ms=10000                   # maximum time to wait for a new message 
                                                # before stopping the consumer
)

Inspect the available topics on the brokers:

In [3]:
# list all available topics on the kafka brokers
consumer.topics()

{'my_awesome_topic'}

In the Publish/Subscribe (Pub/Sub) model, before consuming messages from a specific topic, you need to subscribe to that topic. By subscribing, the consumer expresses its interest in receiving messages from the specified topic.

It's important to note that subscribing to a topic doesn't immediately consume any messages. 

Instead, it establishes a connection between the consumer and the partitions of the subscribed topic hosted on the Kafka brokers. This connection allows the consumer to start polling for messages from those partitions.

Once you have subscribed to the topic, the consumer can start polling for messages, fetching messages from the subscribed partitions and return them to the consumer for further processing.

In [4]:
# subscribe to a topic
consumer.subscribe('my_awesome_topic')

# check the active subscriptions
consumer.subscription()

{'my_awesome_topic'}

The `KafkaConsumer` class will also offer the possibility to inspect the topics (for instance in terms of the number of partitions), but **not** to modify them. 

We can inspect how many partitions the specific topic is made of:

In [5]:
# print the list of partition IDs 
# e.g. a topic with tree partitions will have partition IDs {0, 1, 2}
consumer.partitions_for_topic('my_awesome_topic')

{0}

We can instruct the consumer to `poll` (i.e., ask for all new messages stored in the topic) with a given cadence/logic.

For instance, we can set the consumer to read only 10 messages at a time, with a timeout between subsequent readouts of a given $\Delta t$.

In [6]:
# set up the polling strategy for the consumer
consumer.poll(timeout_ms=0,         # do not enable dead-times before one poll to the next
              max_records=None,     # do not limit the number of records to consume at once 
              update_offsets=True   # update the reading offsets on this topic
             )

{}

Now the consumer is ready to poll messages (until it is stopped or it reaches a timeout).

Let's look for all messages in the consumer:

In [8]:
# this consumer will keep polling for messages 
# until stopped by the user
# (or reaches the consumer_timeout_ms, if specified)
for message in consumer:
    print (message)

ConsumerRecord(topic='my_awesome_topic', partition=0, offset=11, timestamp=1685000547307, timestamp_type=0, key=None, value=b'a message from the revived producer', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=35, serialized_header_size=-1)


The reading offset can also be reset to the beginning of the topic, allowing for re-reading the entire topic:

In [9]:
# go back to the beginning of the topic
consumer.seek_to_beginning()

# read the entire topic and print it out
for message in consumer:
    print (message)

ConsumerRecord(topic='my_awesome_topic', partition=0, offset=0, timestamp=1684998788318, timestamp_type=0, key=None, value=b'hello..', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=7, serialized_header_size=-1)
ConsumerRecord(topic='my_awesome_topic', partition=0, offset=1, timestamp=1684998792027, timestamp_type=0, key=None, value=b'anybody there?', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=14, serialized_header_size=-1)
ConsumerRecord(topic='my_awesome_topic', partition=0, offset=2, timestamp=1684998800626, timestamp_type=0, key=None, value=b'maybe another one', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=17, serialized_header_size=-1)
ConsumerRecord(topic='my_awesome_topic', partition=0, offset=3, timestamp=1684998869326, timestamp_type=0, key=None, value=b'q', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=1, serialized_header_size=-1)
ConsumerRecord(topic='my_awesome_to

The message content (`ConsumerRecord`) can be quite messy, but it can be easily inspected by parsing only the relevant information.

In [11]:
# import packages to interpret dates
from datetime import datetime

# go back to the beginning of the topic
consumer.seek_to_beginning()

# break down the message into its main components
for message in consumer:
    print ("{%d}:%d [%s] k=%s v=%s" % (message.partition,
                          message.offset,
                          datetime.fromtimestamp(message.timestamp/1000).time(),
                          message.key,
                          message.value))

{0}:0 [07:13:08.318000] k=None v=b'hello..'
{0}:1 [07:13:12.027000] k=None v=b'anybody there?'
{0}:2 [07:13:20.626000] k=None v=b'maybe another one'
{0}:3 [07:14:29.326000] k=None v=b'q'
{0}:4 [07:14:32.301000] k=None v=b'exit'
{0}:5 [07:14:33.803000] k=None v=b'quit'
{0}:6 [07:16:11.211000] k=None v=b'ciao'
{0}:7 [07:23:10.413000] k=None v=b'message 1'
{0}:8 [07:28:28.667000] k=None v=b'a new message'
{0}:9 [07:29:55.211000] k=None v=b'a message from the revived producer'
{0}:10 [07:30:32.642000] k=None v=b'a message from the revived producer'
{0}:11 [07:42:27.307000] k=None v=b'a message from the revived producer'
{0}:12 [07:48:05.369000] k=b'some_key' v=b'a message with key'


Let's change the topic to which the consumer is subscribed and make it a partitioned one:

In [12]:
# subscribe to a partitioned topic
consumer.subscribe('a_partitioned_topic')
consumer.subscription()

{'a_partitioned_topic'}

By inspecting the number of partitions for this topic, we can now see that there are 2 partitions: partition #0 and partition #1.

In [13]:
# check the partitions in the partitioned topic
consumer.partitions_for_topic('a_partitioned_topic')

{0, 1}

When reading from a partitioned topic, it's easy to observe that the messages are sent to the two partitions in a seemingly arbitrary way.

In [15]:
# import json to unpack json strings
import json

# go back to the beginning of the topic
consumer.seek_to_beginning()

# read messages from the beginning of the topic
# decode the json into the python dictionary format
for message in consumer:
    print ("%d:%d:\t v=%s" % (message.partition,
                          message.offset,
                          json.loads(message.value)))

1:0:	 v={'name': 'Andy', 'surname': 'Johnson', 'amount': '997.55', 'delta_t': '4.83', 'flag': 0}
1:1:	 v={'name': 'John', 'surname': 'Jones', 'amount': '799.96', 'delta_t': '8.65', 'flag': 0}
1:2:	 v={'name': 'Andy', 'surname': 'Jones', 'amount': '759.91', 'delta_t': '4.78', 'flag': 0}
1:3:	 v={'name': 'Alice', 'surname': 'Millers', 'amount': '47.85', 'delta_t': '8.94', 'flag': 1}
1:4:	 v={'name': 'Alice', 'surname': 'Jones', 'amount': '925.13', 'delta_t': '4.44', 'flag': 0}
1:5:	 v={'name': 'Joe', 'surname': 'Johnson', 'amount': '203.05', 'delta_t': '6.15', 'flag': 0}
1:6:	 v={'name': 'John', 'surname': 'Millers', 'amount': '592.80', 'delta_t': '5.15', 'flag': 0}
1:7:	 v={'name': 'Alice', 'surname': 'Jones', 'amount': '146.59', 'delta_t': '0.04', 'flag': 0}
1:8:	 v={'name': 'Joe', 'surname': 'Johnson', 'amount': '481.44', 'delta_t': '4.79', 'flag': 1}
1:9:	 v={'name': 'Joe', 'surname': 'Jones', 'amount': '532.83', 'delta_t': '7.42', 'flag': 0}
0:0:	 v={'name': 'Alice', 'surname': 'Jon

## Creating a consumer accessing only one partition

Publishing records to a partitioned topic is typically transparent to the user: the producer publishes to the topic, and the Kafka cluster will redirect the message to the partition leader, later replicating it to the followers.

The same applies to a generic consumer. As we have just seen, data is consumed from all partitions within the topic.

In some cases, however, it may be more suitable to instantiate multiple consumers, with each one reading from a specific partition of a topic.

Let's create a consumer specifically designed to access the data from partition #0 of the previous partitioned topic.

In [16]:
# import TopicPartition to be able to assign a partition to a consumer
from kafka import TopicPartition

# create a standard consumer with a timeout of 10 seconds
consumer_part_0 = KafkaConsumer(bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
                                client_id='consumer_n_0',
                                consumer_timeout_ms=10000)

# assign the consumer to a specific topic-partition combination
consumer_part_0.assign([TopicPartition('a_partitioned_topic', # topic
                                       0                      # partition id
                                       )
                        ]) 

In [17]:
# go back to the beginning of the topic (for this specific partition)
consumer_part_0.seek_to_beginning()

# read messages from the beginning of the topic (for this specific partition)
# decode the json into the python dictionary format
for message in consumer_part_0:
    print ("%d:%d:\t v=%s" % (message.partition,
                          message.offset,
                          json.loads(message.value)))

0:0:	 v={'name': 'Alice', 'surname': 'Jones', 'amount': '931.01', 'delta_t': '4.72', 'flag': 1}
0:1:	 v={'name': 'Andy', 'surname': 'Jones', 'amount': '306.79', 'delta_t': '2.46', 'flag': 0}
0:2:	 v={'name': 'John', 'surname': 'Millers', 'amount': '571.64', 'delta_t': '4.32', 'flag': 0}
0:3:	 v={'name': 'Joe', 'surname': 'Millers', 'amount': '990.07', 'delta_t': '0.14', 'flag': 0}
0:4:	 v={'name': 'Andy', 'surname': 'Millers', 'amount': '965.25', 'delta_t': '2.72', 'flag': 0}
0:5:	 v={'name': 'Andy', 'surname': 'Johnson', 'amount': '391.60', 'delta_t': '8.13', 'flag': 1}
0:6:	 v={'name': 'Joe', 'surname': 'Millers', 'amount': '878.00', 'delta_t': '4.85', 'flag': 1}
0:7:	 v={'name': 'John', 'surname': 'Johnson', 'amount': '996.09', 'delta_t': '2.24', 'flag': 0}
0:8:	 v={'name': 'Alice', 'surname': 'Johnson', 'amount': '834.41', 'delta_t': '6.51', 'flag': 0}
0:9:	 v={'name': 'Andy', 'surname': 'Jones', 'amount': '974.99', 'delta_t': '9.33', 'flag': 0}


## Creating a consumer group

Multiple consumers can read from the same topic.

In Kafka, every consumer is part of a consumer group (even a single consumer is part of its own consumer group, really).

A consumer group consists of one or more cooperating consumers that gather data from the same topic. The consumer group dynamically balances the load across its members and redistributes the consume calls.

If a consumer within a consumer group fails, the remaining consumers in the same group will continue to read the entire data from the subscribed topic.

In [18]:
# create `consumer_one` to read from a specific partition
# assign this consumer to a group
consumer_one = KafkaConsumer(bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
                             client_id='consumer_one',
                             group_id='my_group',                       # the same group will be used by all consumers
                             auto_offset_reset='earliest',
                             consumer_timeout_ms=10000)

In [19]:
from kafka import ConsumerRebalanceListener

# subscribe `consumer_one` to the partitioned topic
consumer_one.subscribe('a_partitioned_topic',
                       listener=ConsumerRebalanceListener()             # set a logic on how the group should be re-balance 
                                                                        # when partitions are added or removed
                                                                        # or when new consumers are assigned to partitions
                       )

---

### Consuming messages as a group
Use the `ConsumerGroup` notebook to:
1. create a second consumer `consumer_two`
2. assign it to the same consumer group `my_group`
3. subscribe to the same topic `a_partitioned_topic`

Each consumer within a group is going to be an independent process (should be run in parallel from the others) and will provide access to a fraction of the incoming data

In [20]:
# check the partitions assigned to `consumer_one`
consumer_one.assignment()

set()

Start the two consumers and process data in parallel from the two partitions.

Typically, one would achieve this by running the two consumers in separate threads, processes, or executors, depending on the desired computing architecture.

In [22]:
# use multiple consumers in parallel (`consumer_one` in this notebook)
for message in consumer_one:
    print ("%d:%d: k=%s v=%s" % (message.partition,
                          message.offset,
                          message.key,
                          json.loads(message.value)))

0:10: k=None v={'name': 'Alice', 'surname': 'Johnson', 'amount': '304.43', 'delta_t': '7.44', 'flag': 0}
0:11: k=None v={'name': 'Andy', 'surname': 'Smith', 'amount': '481.50', 'delta_t': '8.45', 'flag': 0}
0:12: k=None v={'name': 'Joe', 'surname': 'Smith', 'amount': '240.66', 'delta_t': '6.20', 'flag': 1}
0:13: k=None v={'name': 'Andy', 'surname': 'Jones', 'amount': '996.98', 'delta_t': '4.99', 'flag': 0}
0:14: k=None v={'name': 'John', 'surname': 'Millers', 'amount': '134.24', 'delta_t': '8.64', 'flag': 0}
0:15: k=None v={'name': 'Andy', 'surname': 'Millers', 'amount': '763.71', 'delta_t': '3.43', 'flag': 0}
0:16: k=None v={'name': 'John', 'surname': 'Smith', 'amount': '582.05', 'delta_t': '1.95', 'flag': 1}
0:17: k=None v={'name': 'Andy', 'surname': 'Smith', 'amount': '587.28', 'delta_t': '1.30', 'flag': 1}
0:18: k=None v={'name': 'Alice', 'surname': 'Johnson', 'amount': '628.08', 'delta_t': '4.92', 'flag': 0}
0:19: k=None v={'name': 'Joe', 'surname': 'Smith', 'amount': '941.20', 'd

---

## Reading from the Kafka+Spark results topic

Let's subscribe to the `results` topic and monitor the frauds

In [26]:
consumer.subscribe('results')

for message in consumer:
    print ("%d:%d: k=%s v=%s" % (message.partition,
                          message.offset,
                          message.key,
                          message.value))
    print ('      --> sending alert message to user {}\n'.format(message.key.decode('ascii')))

1:0: k=b'JoeJohnson' v=b'1'
      --> sending alert message to user JoeJohnson

1:1: k=b'JoeMillers' v=b'1'
      --> sending alert message to user JoeMillers

0:0: k=b'AliceMillers' v=b'1'
      --> sending alert message to user AliceMillers

0:1: k=b'AliceMillers' v=b'1'
      --> sending alert message to user AliceMillers

0:2: k=b'AliceJones' v=b'1'
      --> sending alert message to user AliceJones

0:3: k=b'AliceJones' v=b'1'
      --> sending alert message to user AliceJones

0:4: k=b'AliceMillers' v=b'1'
      --> sending alert message to user AliceMillers

0:5: k=b'AndyMillers' v=b'1'
      --> sending alert message to user AndyMillers

0:6: k=b'AndyJones' v=b'1'
      --> sending alert message to user AndyJones

1:2: k=b'AliceSmith' v=b'1'
      --> sending alert message to user AliceSmith

0:7: k=b'AndyJones' v=b'1'
      --> sending alert message to user AndyJones

0:8: k=b'AndyMillers' v=b'1'
      --> sending alert message to user AndyMillers

0:9: k=b'AndySmith' v=b'1'
 