## Overview
Message written to a topic by a producer is read by one or more Kafka consumers. Just as there can be multiple producers, there can be multiple consumers. Consumers are part of a *consumer group*. A partition of a topic is assigned to a unique consumer of a consumer group. The distribution looks like:

<img src="images/cg_scaling.png" width=900 height=auto />

We can have multiple consumer groups consuming the same topic. Each will get messages in the topic independent of one another:

<img src="images/multi_cg.png" width=350 height=auto />

As we can see, the maximum number of consumers in a consumer group should not exceed the number of partitions, else we would have idle consumers.

## Rebalancing
Partitions assigned to consumers in a consumer group need to revisited when:
- a consumer fails (no heartbeat)
- a consumer leaves a group (by calling `close()`)
- a new consumer joins (first call to `poll()`)
- new partition gets added

Moving partition ownership from one consumer to another is called a *rebalance*. There are two ways to perform rebalancing:  
- **Eager Rebalancing:** all consumers temporarily stop consuming, give up their ownership of partitions, leave the consumer group, rejoin the consumer group and finally get reassigned a new partition.
- **Cooperative Rebalancing:** this is similar to eager rebalancing, except for only a subset of partitions are considered for rebalancing. The advantage is that it avoids *stop-the-world* unavailability of the eager method.

### Group Coordinator and Group Leader
The Group Coordinator is a dedicated Kafka broker that is responsible for managing the state of a specific consumer group. The first consumer to join a consumer group (by sending a `JoinGroup` request) becomes the group leader.

Group leader receives list of all consumers in a consumer group from the group coordinator and decides the partition ownership for all consumers. Once the partition assignment is decided it is sent to group coordinator which then propagates it to rest of the consumers in the consumer group. The consumers have to send heartbeats to the group coordinator to be considered active.

## Constructing Consumer
We instantiate an instance of `KafkaConsumer` as:

In [None]:
Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092,broker2:9092");
props.put("group.id", "CountryCounter"); // This is the consumer group (not mandatory)
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

If `group.id` is not specified, the consumer will exhibit the following behaviours:
- no offset committing: if the consumer stops and restarts, it will not remember where it left off
- no load balancing: every ungrouped consumer will read all available partitions

Thus it is not common to not specify `group.id`.

Full list available [here](https://kafka.apache.org/documentation.html#newconsumerconfigs). Some important ones are: 

| Property            | Description                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                |
|---------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| fetch.min.bytes     | minimum amount of data that it wants to receive from the broker when fetching records. If a broker receives a request for records from a consumer but the new records amount to fewer bytes, the broker will wait until more messages are available before sending the records back to the consumer.                                                                                                                                                                                                                       |
| fetch.max.wait.ms   | broker will not wait longer than this duration before sending a response back, even if the required fetch.min.bytes has not been met. By default, Kafka will wait up to 500 ms.                                                                                                                                                                                                                                                                                                                                            |
| session.timeout.ms  | amount of time a consumer can be out of contact with the brokers while still considered alive, defaults to 10 seconds. Its value should be greater than `heartbeat.interval.ms` which controls heartbeat rate.                                                                                                                                                                                                                                                                                                             |
| max.poll.interval.ms| maximum amount of time a consumer can spend between calls to `poll()` before it is considered dead and a rebalance is triggered. Defaults to 5 minutes. This is helpful in case consumer thread gets deadlocked or stuck while heartbeat thread keeps on sending heartbeats.                                                                                                                                                                                                                                               |
| auto.offset.reset   | Controls the behavior of the consumer when it starts reading a partition for which it doesn’t have a committed offset or if the committed offset it has is invalid (the consumer was down for so long that the record with that offset was already aged out of the broker). latest: default value and means that lacking a valid offset, the consumer will start reading from the newest records earliest: lacking a valid offset, the consumer will read all the data in the partition, starting from the very beginning. |
| max.poll.records    | maximum number of records that a single call to `poll()` will return.                                                                                                                                                                                                                                                                                                                                                                                                                                                      |

## Consuming Message
We start by subscribing to a topic (or multiple topics):

In [None]:
consumer.subscribe(Collections.singletonList("customerCountries"));

By invoking `subscribe`, the consumer:
- indicates that it wants to consume from the given list of topics
- shows intent to join consumer group specified by `group.id` property

At this point:
- no partitions are assigned yet
- no data is fetched
- no offsets are read

**Poll Loop:** 

In [None]:
// Polling for messages - handles all details of coordination,
// partition rebalances, heartbeats, and data fetching
try {
    while (true) {
        // argument is timeout interval and controls how long poll() will block if data is not available
        // in the consumer buffer. Set to 0, poll() will return immediately
        ConsumerRecords<String, String> records = consumer.poll(100);
        
        // Poll returns multiple records. Each record contains the topic and partition the
        // record came from, the offset of the record within the partition, and of course the
        // key and the value of the record.
        for (ConsumerRecord<String, String> record : records) {
            LOGGER.debug("topic = {}, partition = {}, offset = {}, customer = {}, country = {}",
                record.topic(), record.partition(), record.offset(), record.key(), record.value());
            
            int updatedCount = 0;
            if (custCountryMap.containsKey(record.value())) {
                updatedCount = custCountryMap.get(record.value()) + 1;
            }
            
            custCountryMap.put(record.value(), updatedCount);
        }
    }
} finally {
    // This will close the network connections and sockets.
    // Also retrigger rebalance immediately.
    consumer.close();
}

`poll` does a lot more than getting messages. The first call to `poll`:
  
  **Group Joining and Coordination:**
  - the consumer contacts a bootstrap broker and discovers the *group coordinator* for `group.id`
  - sends a `JoinGroup` request. If there are no consumers in the group, the current consumer becomes the *group leader*
  - the group coordinator detects that a new member has joined and initiates a rebalance. This means:
      - all consumers in the group temporarily stop consumption
      - the group leader runs the partition assignment strategy and hands it over to group coordinator
      - the group coordinator distributes the assignment plan to all consumers
  - the consumer obtains its partition(s)
  
  **Offset Retrieval:**
  - the consumer sends a request to the broker to fetch the last committed offset for its `group.id` to know where to start from
  - when the partitions and starting offsets are determined, the consumer retrieves the message

Subsequent `poll()` invocations:
- send heartbeat
- handle rebalance
- fetch message
- commit offset (described later)

## Commits
Every message stored in a broker has a sequential, immutable unique identifier associated with it - the message offset. It is essentially the index of the record in the partition. Whenever we poll, we intend to read new messages that were not processed before - continue from the last offset that was read.

In Kafka, it is the consumer's job to store the last read commit for a specific partition back to a designated location on the Kafka cluster. This process is called as *committing offset*. This process is critical because it tells Kafka: "I have successfully finished processing all messages up to this point. If I restart, begin sending me messages starting from the next offset".

Kafka has a designated topic `__consumer_offsets` that stores offsets:

<img src="images/consumer_offsets.png" />

The commit process looks like:
1. Consumer processes records up to a certain offset (e.g., 1000).
2. Consumer sends an `OffsetCommitRequest` (committing offset 1001) to its group coordinator.
3. The group coordinator receives the request and writes a record to the `__consumer_offsets` topic with the key `⟨my-group-id,orders-topic,5⟩` and the value 1001.
4. Later, when the consumer restarts, it asks the group coordinator for the last committed offset, and the coordinator looks up the most recent value for that key, finds 1001, and tells the consumer to start reading from that point.

Consumer internally maintains offset number it has consumed. And uses that number while getting new message using `poll`. Committed offsets are only needed when the first `poll` call is made or when rebalance happens and partitions are redistributed. Therefore committing offsets is like a checkpoint for where you want the app to return should it fail (or shuts down and another consumer has to continue).

Since the act of committing offsets is independent from fetching messages using the internal offset number, it can lead to issue if those two are not in sync:  

<img src="images/duplicate_processing.png" />

### Committing Offsets
There are variety of ways to commit offset:  
- **Automatic Commits:** configure `enable.auto.commit=true`, then every five seconds the consumer will commit the largest offset your client received from poll(). Just like everything else in the consumer, the automatic commits are driven by the `poll` loop. Whenever we `poll`, the consumer checks if it is time to commit, and if it is, it will commit the offsets it returned in the last `poll`.

- **Commit Synchronously:** by setting `auto.commit.offset=false`, offsets will only be committed when the application explicitly chooses to do so (using `commitSync()`).

In [None]:
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n",
            record.topic(), record.partition(), record.offset(), record.key(), record.value());
    }
    
    try {
        consumer.commitSync(); // commits the latest offset returned by the last poll()
    } catch (CommitFailedException e) {
        log.error("commit failed", e)
    }
}

- **Commit Asynchronously:** to improve throughput of the application. The drawback is that while `commitSync()` will retry the commit until it either succeeds or encounters a nonretriable failure, `commitAsync()` will not retry.

In [None]:
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n",
        record.topic(), record.partition(), record.offset(), record.key(), record.value());
    }
    
    consumer.commitAsync();
}

// Commit async supports callback
consumer.commitAsync(new OffsetCommitCallback() {
    public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
    if (e != null)
        log.error("Commit failed for offsets {}", offsets, e);
    }
});

A simple pattern to get commit order right for asynchronous retries is to use a monotonically increasing sequence number. Increase the sequence number every time we commit and add the sequence number at the time of the commit to the `commitAsync` callback. When we’re getting ready to send a retry, check if the commit sequence number the callback got is equal to the instance variable; if it is, there was no newer commit and it is safe to retry. If the instance sequence number is higher, don’t retry because a newer commit was already sent.

- **Combined:** a common pattern is to combine `commitAsync()` with `commitSync()` just before shutdown.

In [None]:
try {
    while (!closing) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n",
            record.topic(), record.partition(), record.offset(), record.key(), record.value());
        }
        consumer.commitAsync();
    }
    consumer.commitSync();
} catch (Exception e) {
    log.error("Unexpected error", e);
} finally {
    consumer.close();
}

What if the number of records received in poll is high and we want to commit in between processing? The following code commits after every 1000 records processed using an alternate form of `commitAsync`:

In [None]:
private Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
int count = 0;
// ...

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n",
            record.topic(), record.partition(), record.offset(), record.key(), record.value());
    
        currentOffsets.put(new TopicPartition(record.topic(), record.partition()), 
            new OffsetAndMetadata(record.offset()+1, "no metadata"));
        if (count % 1000 == 0)
            consumer.commitAsync(currentOffsets, null);
        count++;
    }
}

### Reading Specific Commits
If we want to start reading all messages from the beginning of the partition, or we want to skip all the way to the end of the partition and start consuming only new messages, there are APIs specifically for that:

In [None]:
// Collection because a consumer may be assigned multiple partitions
public void seekToBeginning(Collection<TopicPartition> partitions);

public void seekToEnd(Collection<TopicPartition> partitions);

public void seek(TopicPartition partition, OffsetAndMetadata offsetAndMetadata);

In [None]:
// Read logs from the beginning
final String TOPIC = "logs";

consumer.subscribe(List.of(TOPIC));
consumer.poll(0); // get partition assignment

consumer.seekToBeginning(consumer.assignment()); // goto start for all assigned partitions