# Kafka Consumer (Code #2)

In [1]:
# set this variable with one of the following values
# -> 'local'
# -> 'docker_cluster'
CLUSTER_TYPE ='docker_cluster'

In [2]:
import os

KAFKA_BOOTSTRAP_SERVERS = ''

if CLUSTER_TYPE == 'local':

    KAFKA_HOME = '<PATH_TO_YOUR_kafka_2.13-3.2.0_FOLDER>'
    KAFKA_BOOTSTRAP_SERVERS = ['localhost:9092']
    
    # Start Zookeeper    
    os.system('{0}/bin/zookeeper-server-start.sh {0}/config/zookeeper.properties'.format(KAFKA_HOME)) 
    
    # Start one Kafka Broker
    os.system('{0}/bin/kafka-server-start.sh {0}/config/server.properties'.format(KAFKA_HOME)) 
    
elif CLUSTER_TYPE == 'docker_cluster':

    KAFKA_BOOTSTRAP_SERVERS = ['kafka-broker:9092',""" possibly other brokers in your kafka cluster """]

In [3]:
! pip3 install kafka-python confluent-kafka



In [4]:
from kafka import KafkaConsumer

Kafka consumers can be instantiated via the KafkaConsumer class

```python
#--- A TYPICAL CONSUMER
consumer = KafkaConsumer(
    bootstrap_servers=['62.30.10.23:9092'],  #<<<--- list of brokers
    security_protocol="SSL",                 #<<<--- security protocol (if any) 
    ssl_cafile="./ca.pem",                   #<<<--- certificate details (if any)
    ssl_certfile="./service.cert",           #           ...
    ssl_keyfile="./service.key",             #           ...
    value_deserializer=msgpack.unpackb,      #<<<--- message value deserialization function (e.g. unpack the message from a specific format)
    auto_offset_reset='earliest',            #<<<--- automatically bring the reading offset to the earliest message
    group_id="group_A",                      #<<<--- identify this consumer as part of group_A
)
```


Once more we'll use a simple implementation of the consumer, with no specific configurations used in this example.

In [5]:
# create a consumer pointing to a kafka cluster
consumer = KafkaConsumer(bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
                         consumer_timeout_ms=10000 # if no message is available for consumption 
                                                   # after 10s stop the consumer
                        )

Inspect the brokers for the available topics

In [6]:
# list all available topics on the kafka brokers
consumer.topics()

{'a_partitioned_topic', 'my_awesome_topic', 'results'}

In the PUB/SUB model, we first need to subscribe to the topic of choice.

Subscribing doesn't mean any message is actually received/consumed... 

It only means that from now on the consumer will be able to poll from the partitions of the chosen topics hosted on the brokers.

In [7]:
# subscribe to topic
consumer.subscribe('my_awesome_topic')

# and check the active subscriptions
consumer.subscription()

{'my_awesome_topic'}

The `consumer` class will also offer a possibility to inspect the topics (for instance in terms of the number of partitions), but **not** to modify them. 

We can inspect how many partitions the specific topic is made of:

In [8]:
# print the list of partition IDs 
# e.g. a topic with tree partitions will have partition IDs {0, 1, 2}
consumer.partitions_for_topic('my_awesome_topic')

{0}

We can instruct the consumer to `poll` (i.e. ask for new messages from the topic) with a given cadence/logic.

For instance we can set the consumer to read out only 10 messages at a time, with a timeout between to subsequent readouts of a given $\Delta t$.

In [9]:
# set up the polling strategy for the consumer
consumer.poll(timeout_ms=0,         #<<--- do not enable dead-times before one poll to the next
              max_records=None,     #<<--- do not limit the number of records to consume at once 
              update_offsets=True   #<<--- update the reading offsets on this topic
             )

{}

Now the consumer is ready to poll messages (untile it is stopped or it reaches a timeout).

Let's look for all messages in the consumer:

In [10]:
# this consumer will keep polling and reading for messages until stopped (or it reaches the consumer_timeout_ms)
for message in consumer:
    print (message)

The reading offset can also be brought back to the beginning of the topic, to re-read the entire topic:

In [11]:
consumer.seek_to_beginning()

for message in consumer:
    print (message)

ConsumerRecord(topic='my_awesome_topic', partition=0, offset=0, timestamp=1656194619376, timestamp_type=0, key=None, value=b'message 1', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=9, serialized_header_size=-1)
ConsumerRecord(topic='my_awesome_topic', partition=0, offset=1, timestamp=1656194619414, timestamp_type=0, key=None, value=b'a new message', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=13, serialized_header_size=-1)
ConsumerRecord(topic='my_awesome_topic', partition=0, offset=2, timestamp=1656194619661, timestamp_type=0, key=None, value=b'a message from the revived producer', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=35, serialized_header_size=-1)
ConsumerRecord(topic='my_awesome_topic', partition=0, offset=3, timestamp=1656194619700, timestamp_type=0, key=b'some_key', value=b'a message with key', headers=[], checksum=None, serialized_key_size=8, serialized_value_size=18, serialized_header_s

The message content (`ConsumerRecord`) is quite messy, but can be easily inspected parsing only the relevant infos.

In [12]:
from datetime import datetime

consumer.seek_to_beginning()

# break down the message into its main components
for message in consumer:
    print ("%d:%d [%s] k=%s v=%s" % ( message.partition,
                                      message.offset,
                                      datetime.fromtimestamp(message.timestamp/1000).time(),
                                      message.key,
                                      message.value))

0:0 [22:03:39.376000] k=None v=b'message 1'
0:1 [22:03:39.414000] k=None v=b'a new message'
0:2 [22:03:39.661000] k=None v=b'a message from the revived producer'
0:3 [22:03:39.700000] k=b'some_key' v=b'a message with key'


Let's change the topic to which the consumer is subscribed to a partitioned one:

In [13]:
consumer.subscribe('a_partitioned_topic')
consumer.subscription()

{'a_partitioned_topic'}

By inspecting the number of partitions for this topic we do see now 2 partitions: partition #0 and partition #1

In [14]:
consumer.partitions_for_topic('a_partitioned_topic')

{0, 1}

Reading out from a partitioned topic it's easy to see that the messages are sent to the two partitions in a seemengly arbitrary way:

In [15]:
import json

consumer.seek_to_beginning()

for message in consumer:
    print ("%d:%d:\t v=%s" % (message.partition,
                              message.offset,
                              json.loads(message.value)))

1:0:	 v={'name': 'John', 'surname': 'Jones', 'amount': '4.81', 'delta_t': '9.00', 'flag': 0}
1:1:	 v={'name': 'Joe', 'surname': 'Millers', 'amount': '695.74', 'delta_t': '0.10', 'flag': 0}
1:2:	 v={'name': 'Joe', 'surname': 'Millers', 'amount': '895.50', 'delta_t': '7.90', 'flag': 0}
1:3:	 v={'name': 'Joe', 'surname': 'Johnson', 'amount': '629.66', 'delta_t': '2.30', 'flag': 0}
1:4:	 v={'name': 'John', 'surname': 'Johnson', 'amount': '414.87', 'delta_t': '6.28', 'flag': 0}
1:5:	 v={'name': 'Andy', 'surname': 'Smith', 'amount': '538.80', 'delta_t': '0.46', 'flag': 0}
1:6:	 v={'name': 'Andy', 'surname': 'Johnson', 'amount': '386.12', 'delta_t': '1.84', 'flag': 0}
1:7:	 v={'name': 'Alice', 'surname': 'Smith', 'amount': '452.77', 'delta_t': '9.53', 'flag': 0}
1:8:	 v={'name': 'Andy', 'surname': 'Smith', 'amount': '237.29', 'delta_t': '2.41', 'flag': 0}
1:9:	 v={'name': 'Andy', 'surname': 'Johnson', 'amount': '671.85', 'delta_t': '5.20', 'flag': 0}
1:10:	 v={'name': 'Joe', 'surname': 'Jones

## Creating a consumer accessing only one partition

Publishing records to a partitioned topic is typically transparent for the user: the producer publishes to the topic, and the kafka cluster will redirect the message to the partition leader, later replicating that to the followers.

The same goes for a generic consumer. As we have just seen data is consumed from all partitions within the topic.

In some cases, it can however be more suitable to instantiate multiple consumers, each reading from a specific partition of a topic.

Let's assign a consumer specific to access the data of partition #0 of the previous partitioned topic.

In [16]:
from kafka import TopicPartition

# create a standard consumer
consumer_part_0 = KafkaConsumer(bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
                                client_id='consumer_n_0',
                                consumer_timeout_ms=10000)

# assign it to a specific topic - partition combination
consumer_part_0.assign([TopicPartition('a_partitioned_topic', 0)]) # <<--- name of the topic, partition id

In [17]:
consumer_part_0.seek_to_beginning()

for message in consumer_part_0:
    print ("%d:%d:\t v=%s" % (message.partition,
                              message.offset,
                              json.loads(message.value)))

0:0:	 v={'name': 'Andy', 'surname': 'Johnson', 'amount': '111.11', 'delta_t': '2.50', 'flag': 1}
0:1:	 v={'name': 'Joe', 'surname': 'Smith', 'amount': '769.24', 'delta_t': '8.36', 'flag': 1}
0:2:	 v={'name': 'Joe', 'surname': 'Jones', 'amount': '480.69', 'delta_t': '2.10', 'flag': 0}
0:3:	 v={'name': 'Alice', 'surname': 'Smith', 'amount': '64.24', 'delta_t': '8.48', 'flag': 1}
0:4:	 v={'name': 'Andy', 'surname': 'Jones', 'amount': '559.06', 'delta_t': '0.29', 'flag': 0}
0:5:	 v={'name': 'Joe', 'surname': 'Millers', 'amount': '847.24', 'delta_t': '0.88', 'flag': 0}
0:6:	 v={'name': 'Alice', 'surname': 'Johnson', 'amount': '604.01', 'delta_t': '7.46', 'flag': 0}
0:7:	 v={'name': 'Alice', 'surname': 'Johnson', 'amount': '518.79', 'delta_t': '8.14', 'flag': 0}
0:8:	 v={'name': 'John', 'surname': 'Jones', 'amount': '590.01', 'delta_t': '2.61', 'flag': 1}
0:9:	 v={'name': 'Andy', 'surname': 'Millers', 'amount': '667.55', 'delta_t': '1.07', 'flag': 0}
0:10:	 v={'name': 'Joe', 'surname': 'Mill

## Creating a consumer group

Multiple consumers can read from the same topic.

In kafka, every consumer is part of a consumer group (even a single consumer is part of its own consumer group, really). 

A consumer group is a number (1 or more) of cooperating consumers gathering data from the same topic, balancing the load across them and redistributing the consume calls dynamically.

If a consumer inside a consumer-group fails, the others from the same group will keep reading the whole data from the topic to which they are subscribed.

In [18]:
# create one consumer_one to read from 1 partition
# assign this consumer to a group
consumer_one = KafkaConsumer(bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
                             client_id='consumer_one',
                             group_id='my_group',
                             auto_offset_reset='earliest',
                             consumer_timeout_ms=10000)

In [19]:
from kafka import ConsumerRebalanceListener
# subscribe this specific consumer to the partitioned topic
consumer_one.subscribe('a_partitioned_topic',
                       listener=ConsumerRebalanceListener())

#### BREAK (jump to Code #3)
Use the ConsumerGroup notebook to:
1. create a second consumer `consumer_two`
2. assign it to the same consumer group `my_group`
3. subscribe to the same topic `a_partitioned_topic`

Each consumer within a group is going to be an independent process (should be run in parallel from the others) and will provide access to a fraction of the incoming data

In [None]:
consumer_one.assignment()

In [None]:
# Use multiple consumers in parallel --> typically you would run each on a different thread / process / executor
for message in consumer_one:
    print ("%d:%d: k=%s v=%s" % (message.partition,
                          message.offset,
                          message.key,
                          message.value))

## Reading from the Kafka+Spark results topic (Code #??)

Let's subscribe to the `results` topic and monitor the frauds

In [None]:
consumer.subscribe('results')

for message in consumer:
    print ("%d:%d: k=%s v=%s" % (message.partition,
                          message.offset,
                          message.key,
                          message.value))
    print ('      --> sending alert message to user {}\n'.format(message.key.decode('ascii')))