In [1]:
%classpath add mvn org.apache.kafka kafka_2.12 2.3.0

## Consumers and Consumer Groups

- Suppose the application reads messages from a Kafka topic, run some validations against them, and write the results to another data store
- When a single consumer can keep up with the speed of message production, then you would not risk losing data
- *Consumer groups* help facilitate and scale the consumption from topics, allowing multiple consumers to read from the same topic, splitting the data between them
- Kafka consumers are typically part of a consumer group, when multiple consumers are subscribed to a topic and belong to the same consumer group, each consumer in the group will receive messages from a different subset of the partitions in the topic

<img src="img/Snip20200424_6.png" width=80%/>

- A partition can be assigned to at most one consumer, when the number of consumer exceeds the number of partitions, some consumers will be idle and get no messages

- In addition to adding consumers in order to scale a single application, it is common to have multiple applications that need to read data from the same topic
    - In which scenarios, we want each application to get all of the messages, rather than a subset
    - To ensure all application gets all the messages in a topic, each application needs to have its own consumer group
    
<img src="img/Snip20200424_7.png" width=80%/>

- In summary, create a new consumer group for each application that needs all the messages from one or more topics; add consumers to an existing consumer group to scale the reading and processing of messages from the topics

## Consumer Groups and Partition Rebalance

- When a new consumer is added to the group, it starts consuming messages from partitions previously consumed by another consumer; same thing happens when a consumer shuts down or crashes
- Reassignment of partitions to consumers also happen when the topics the consumer group is consuming are modified 
- Moving partition ownership from one consumer to another is called a *rebalance*, which is important since it provide the consumer group with high availability and scalability
- During a rebalance, consumers can't consume messages, so a rebalance is basically a short window of unavailability of the entire consumer group
    - Also, when partitions are moved from one consumer to another, the consumer loses its current state



- Consumers maintain membership in a consumer group and ownership of the partitions assigned to them by sending *heartbeats* to a Kafka broker designated as the *group coordinator*
    - If the consumer stops sending heartbeats for long enough, its session will time out and the group coordinator will consider it dead and trigger a rebalance



## Creating a Kafka Consumer

- First step to start consuming records is to create a `KafkaConsumer` instance
- The three mandatory properties: `bootstrap.servers`, `key.deserializer`, and `value.deserializer`
- Additionally, `group.id` specifies the consumer group the `KafkaConsumer` instance belongs to


In [9]:
package kafka_definitive_guide.consumer;

import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

Properties kafkaProps = new Properties();
kafkaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:19092");
kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, "CountryCounter");
kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaProps);

null

## Offset and Consumer Position

- Kafka maintains a numerical offset for each record in a partition
- The offset acts as a unique identifier of a record within the partition and also denotes the position of the consumer in the partition
    - E.g., a consumer at position 5 has consumed records with offsets 0 through 4


- The `position` of the consumer gives the offset of the next record that will be given out, which is one larger than the highest offset the consumer has seen in the partition.
    - This automatically advances every time the consumer receives messages in a call to `poll(Duration)`
    
    
- The `committed position` is the last offset that has been stored securely. Should the process fail and restart, this is the offset that the consumer will recover to.
    - The consumer can either automatically commit offsets periodically, or choose to control this committed position manually by calling one of the commit APIs (e.g., `commitSync` and `commitAsync`)

## Subscribing to Topics

- The consumer can subscribe to one or more topics
- The `subscribe()` method takes a list of topics as a parameter

```java
consumer.subscribe(Collections.singletonList("customerCountries"));
```

- It is also possible to call `subscribe` with a regular expression, which can match multiple topic names; and if a new topic is created with a name that matches, a rebalance will happen almost immediately and the consumers will start consuming from the new topic

```java
consumer.subscribe("my.topic.*");
```

## The Poll Loop

- The core of the consumer API is a loop for polling the server for more data
- Once the consumer subscribes to topics, the poll loop handles all details of coordination, partition rebalances, heartbeats, and data fetching
- The first time `poll` is called with a new consumer, it is responsible for finding the `GroupCoordinator`, joining the consumer group, and receiving a partition assignment
- If a rebalance is triggered, it will be handled inside the poll loop
- The heartbeats that keep consumers alive are sent from within the poll loop


In [11]:
package kafka_definitive_guide.consumer;

import java.util.Properties;
import java.time.Duration;
import java.util.Collections;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;

Properties kafkaProps = new Properties();
kafkaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:19092");
kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, "CountryCounter");
kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaProps);

consumer.subscribe(Collections.singletonList("CustomerCountry"));

Duration pollingTimeout = Duration.ofMillis(100);

try {
    // Consumers are usually long-running applications that continuously
    // poll Kafka for more data
    while (true) {
        // Consumers must keep polling Kafka or they will be considered dead
        // and the partitions they are consuming will be handed to another consumer
        // in the group to continue consuming
        ConsumerRecords<String, String> records = consumer.poll(pollingTimeout);
        // poll returns a list of records, each record contains the topic and 
        // partition the record came from, the offset of the record within the partition
        // and the key-value pair of the record
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        }
    }
} finally {
    consumer.close();
}

ERROR:  org.apache.kafka.common.errors.InterruptException

## Configuring Consumers

- `fetch.min.bytes`: the minimum amount of data that a consumer wants to receive from the broker when fetching records
    - Used to reduce the load and the number of round trips between the consumer and the broker
    - Along with `fetch.max.wait.ms` which controls how long the broker wait before responding to the consumer


- `max.partition.fetch.bytes`: the maxium number of bytes the server will return per partition
    - when `KafkaConsumer.poll()` returns `ConsumerRecords`, the record object will use at most `max.partition.fetch.bytes` per partition assigned to the consumer
    - must be larger than `max.message.size`
    - the consumer must call `poll()` frequently enough to avoid a session timeout, so if this is too large, it might take too long to process the messages and then do another poll


- `session.timeout.ms`: the amount of time a consumer can be out of contact with the brokers while still considered alive
     - if more than `session.timeout.ms` passes without the consumer sending a heartbeat to the group coordinator, it is considered dead and the group coordinator will trigger a rebalance of the consumer group to allocate partitions from the dead consumer to other consumers
     - also `heartbeat.interval.ms` which controls how frequently `KafkaConsumer.poll()` sends a heartbeat to the group coordinator, usually `heartbeat.interval.ms` is one-third of `session.timeout.ms`


- `auto.offset.reset`: controls the behavior of the consumer when it starts reading a partition for which it doesn't have a committed offset or if the committed offset it has is invalid
    - default to *latest*, which means that lacking a valid offset, the consumer will start reading from the newest records
    - or "earliest", which means the consumer will read all the data in the partition start from the very beginning


- `enable.auto.commit`: controls whether the consumer will commit offsets automatically
    - default to true, set to false if you prefer to control when offsets are committed
    - also `auto.commit.interval.ms` controls how frequently offsets will be committed


- `partition.assignment.strategy`: given consumers and topics they subscribed to, decides which partitions will be assigned to which consumer
    - `RangeAssignor`: assign to each consumer a consecutive subset of partitions from each topic it subscribes to
    - `RoundRobinAssignor`: takes all partitions from all subscribed topics and assigns them to consumers sequentially, one by one


- `client.id`: used by the brokers to identify messages sent from the client, used in logging, metrics, and quotas


- `max.poll.records`: controls the maximum number of records that a single call to `poll` will return


- `receive.buffer.bytes` & `send.buffer.bytes`: size of the TCP send and receive buffers used by the sockets when writing and reading data


## Commits and Offsets

- Kafka does not track acknowledgements from consumers, instead it allows consumers to use Kafka to track their position (offset) in each partition
- The action of updating the current position in the partition is called a _commit_
- The consumer commit an offset by producing a message to Kafka to a special `__consumer_offsets` topic with the committed offset for each partition
- If a consumer crashes or a new consumer joins the consumer group, a rebalance will be triggered, after that, each consumer may be assigned a new set of partitions than the one it processed before; in order to know where to pick up the work, the consumer will read the latest committed offset of each partition and continue from there


- Depending on the state of the committed offset and the last processed messages, it is possible that a consumer can re-process messages or miss messages

<img src="img/Snip20200425_10.png"/>
<img src="img/Snip20200425_11.png"/>


### Automatic Commit

- By default, `enable.auto.commit` is set to true and every `auto.commit.interval.ms` (default to 5 seconds) the consumer will commit the largest offset the client received from `poll`
- This commit is driven by the poll loop, whenever poll happens, the consumer checks if it is time to commit, and if it is, it commit the offsets it returned from the last poll
- This option cannot completely eliminate duplicated records, due to the inevitable `auto.commit.interval.ms` old commit 




### Commit Current Offset

- To eliminate the possibility of missing messages and to reduce the number of messages duplicated during rebalancing, the consumer API has the option of committing the current offset at some point rather than based on a timer
- By setting `enable.auto.commit` to false, offsets will only be committed when the application explicitly chooses to do so


- `commitSync()`: commit the latest offset returned by `poll()` and return once the offset is committed, throwing an exception if commit fails for some reason



In [None]:
package kafka_definitive_guide.consumer;

import java.time.Duration;
import java.util.Collections;
import kafka_definitive_guide.consumer.ConsumerFactory;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.CommitFailedException;

Properties kafkaProps = new Properties();
kafkaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:19092");
kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, "CountryCounter");
kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaProps);

consumer.subscribe(Collections.singletonList("CustomerCountry"));


try {
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(pollingTimeout);
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        }
        try {
            consumer.commitSync();
        } catch (CommitFailedException e) {
            // necessary error handling
        }
    }
} finally {
    consumer.close();
}

### Asynchronous Commit

- Synchronous commit blocks he application until the broker responds to the commit request, which limit the throughput of the application; throughput can be improved by committing less frequently, but the number of potential duplicates go up


- `commitAsync`: send the commit request in an non-blocking way
    - `commitAsync` will not retry unlike `commitSync`, because by the time `commitAsync` receives a response from the broker, there may have been a later commit that was already successful
    - `commitAsync` also gives you an option  to pass in a callback that will be triggered when the broker responds; so commonly the callback is used to log commit errors or to count them in a metric
    - optionally, one can retry in the commit callback, with the help of a monotonically increasing sequence number: increase the sequence number every time you commit and add the sequence number at the time of commit, upon retry, check if the commit sequence number the callback got is equal to the instance variable


In [None]:
try {
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(pollingTimeout);
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        }
        consumer.commitAsync(new OffsetCommitCallback() {
            public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets,
                                   Exception e) {
                if (e != null) log.error("Commit failed for offsets {}", offsets, e);
            }
        });
    }
} finally {
    consumer.close();
}

### Combining Synchronous and Asynchronous Commits

- Occasional failures to commit without retrying are usually ignorable because the errors can be transient
- If we know that this is the last commit before we close the consumer, or before a rebalance, we want to make sure that the commit succeeds
- A common pattern is to combine `commitAsync` with `commitSync` just before shutdown

In [None]:
try {
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(pollingTimeout);
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        }
        consumer.commitAsync();
    }
} finally {
    try {
        consumer.commitSync();
    } finally {
        consumer.close();
    }
}

### Commit Specified Offset

- `commitSync()` and `commitAsync()` optionally accepts a map of partitions and offsets that need to be committed

In [None]:
Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
int count = 0;

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        currentOffsets.put(new TopicPartition(record.topic(), record.partition()),
                           new OffsetAndMetadata(record.offset() + 1, "no meta"));
        if (count++ % 1000 == 0) {
            consumer.commitAsync(currentOffests, null);
        }
    }
}

## Rebalance Listeners

- A consumer will want to do some cleanup work before exiting and also before partition rebalancing; if the consumer is about to lose ownership of a partition, you will want to commit offsets of the last event you've processed


- The consumer API allows you to run code when partitions are added or remove from the consumer
- `subscribe()` takes a `ConsumerRebalanceListener` as parameter
    - `public void onPartitionsRevoked(Collection<TopicPartition> partitions)`: called before the rebalancing starts and after the consumer stopped consuming messages; where the consumer should commit offsets
    - `public void onPartitionsAssigned(Collection<TopicPartition> partitions)`: called after partitions have been reassigned to the broker, but before the consumer starts consuming messages




In [11]:
package kafka_definitive_guide.consumer;

import java.util.*;
import org.apache.kafka.common.*;
import org.apache.kafka.clients.consumer.*;

Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();

class HandleRebalance implements ConsumerRebalanceListener {
    private Consumer<?, ?> consumer;
    
    public HandleRebalance(Consumer<?, ?> consumer) {
        this.consumer = consumer;
    }
    
    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        // no-op
    }
    
    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        consumer.commitSync(currentOffsets);
    }
}

null

## Consuming Records with Specific Offsets

- `seekToBeginning(TopicPartition tp)`: read all messages from the beginning of the partition
- `seekToEnd(TopicPartition tp):` skip all the way to the end of the partition and start consuming only new messages
- `seek()`: starts reading from a specific offset 


In [None]:
public class SaveOffsetsOnRebalance implements ConsumerRebalanceListener {
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        for (TopicPartition partition: partitions) {
            consumer.seek(partition, getOffsetFromDB(partition));
        }
    }
    
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        commitDBTransaction();
    }
}

consumer.subscribe(topics, new SaveOffsetsOnRebalance(consumer));
consumer.poll(0); // ensure we join a consumer group and get assigned partitions
for (TopicPartition partition: consumer.assignment()) {
    // then immediately seek to the correct offset in the partitions assigned
    consumer.seek(partition, getOffsetFromDB(partition));
}

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records) {
        processRecord(record);
        saveRecordToDB(record);
        saveOffsetToDB(record.topic(), record.partition(), record.offset());
    }
    commitDBTransaction();
}

## Exiting from the Poll Loop

- Call `consumer.wakeup()` from another thread, which will cause `poll()` to exit with `WakeupException`, which does not need to be handled, but before exiting the thread, `consumer.close()` must be invoked

- If the consumer loop is running in the main thread, this can be done from `ShutdownHook` 

In [None]:
Runtime.getRuntime().addShutdownHook(new Thread() {
    public void run() {
        // shutdown hook runs in a separate thread
        consumer.wakeup();
        try {
            mainThread.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
});
    
try {
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        }
        consumer.commitSync();
    }
} catch (WakeupException e) {
    // no-op
} finally {
    consumer.close();
}

## Deserializers

- Kafka consumers require *deserializers* to convert byte arrays from Kafka into Java objects
- Serializers in producers and deserializers in consumers must match; one of the benefits of using Avro and the Schema Repository for serializing and deserializing is that the `AvroSerializer` can ensure that all the data written to a specific topic is compatible with the schema of the topic, which means it can be deserialized with the matching deserializer and schema

## Standalone Consumer without a Consumer Group

- Occasionally you only need a single consumer that always needs to read data from all the partitions in a topic, or from a specific partition in a topic; in which case there is no reason for groups or rebalances; just **assign** the consumer-specific topic and/or partitions, consume messages, and commit offsets


- A consumer can either subscribe to topics and be part of a consumer group, or assign itself partitions, but not both at the same time



In [1]:
List<PartitionInfo> partitionInfos = consumer.partitionsFor("topic");
List<TopicPartition> partitions = new ArrayList<>();

if (partitionInfos != null) {
    for (PartitionInfo partition: partitionInfos) {
        partitions.add(new TopicPartition(partition.topic(), partition.partition()));
    }
    consumer.assign(partitions);
    
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(pollingTimeout);
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        }
        consumer.commitSync();
    }
}


cannot find symbol: cannot find symbol