partitionsFor(String topic);
+
+ /**
+ * @see KafkaConsumer#close()
*/
public void close();
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index 57c1807ccba9..6d4ff7cd2a28 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -9,13 +9,16 @@
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
-*/
+ */
package org.apache.kafka.clients.consumer;
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
import java.util.Map;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceCallback;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
@@ -27,130 +30,121 @@
public class ConsumerConfig extends AbstractConfig {
private static final ConfigDef config;
- /**
- * The identifier of the group this consumer belongs to. This is required if the consumer uses either the
- * group management functionality by using {@link Consumer#subscribe(String...) subscribe(topics)}. This is also required
- * if the consumer uses the default Kafka based offset management strategy.
+ /*
+ * NOTE: DO NOT CHANGE EITHER CONFIG STRINGS OR THEIR JAVA VARIABLE NAMES AS
+ * THESE ARE PART OF THE PUBLIC API AND CHANGE WILL BREAK USER CODE.
*/
- public static final String GROUP_ID_CONFIG = "group.id";
-
+
/**
- * The timeout after which, if the {@link Consumer#poll(long) poll(timeout)} is not invoked, the consumer is
- * marked dead and a rebalance operation is triggered for the group identified by {@link #GROUP_ID_CONFIG}. Relevant
- * if the consumer uses the group management functionality by invoking {@link Consumer#subscribe(String...) subscribe(topics)}
+ * group.id
*/
- public static final String SESSION_TIMEOUT_MS = "session.timeout.ms";
+ public static final String GROUP_ID_CONFIG = "group.id";
+ private static final String GROUP_ID_DOC = "A unique string that identifies the consumer group this consumer belongs to. This property is required if the consumer uses either the group management functionality by using subscribe(topic)
or the Kafka-based offset management strategy.";
/**
- * The number of times a consumer sends a heartbeat to the co-ordinator broker within a {@link #SESSION_TIMEOUT_MS} time window.
- * This frequency affects the latency of a rebalance operation since the co-ordinator broker notifies a consumer of a rebalance
- * in the heartbeat response. Relevant if the consumer uses the group management functionality by invoking
- * {@link Consumer#subscribe(String...) subscribe(topics)}
+ * session.timeout.ms
*/
- public static final String HEARTBEAT_FREQUENCY = "heartbeat.frequency";
+ public static final String SESSION_TIMEOUT_MS_CONFIG = "session.timeout.ms";
+ private static final String SESSION_TIMEOUT_MS_DOC = "The timeout used to detect failures when using Kafka's group management facilities.";
/**
- * A list of URLs to use for establishing the initial connection to the cluster. This list should be in the form
- * host1:port1,host2:port2,...
. These urls are just used for the initial connection to discover the
- * full cluster membership (which may change dynamically) so this list need not contain the full set of servers (you
- * may want more than one, though, in case a server is down).
+ * bootstrap.servers
*/
- public static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers";
+ public static final String BOOTSTRAP_SERVERS_CONFIG = CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
/**
- * If true, periodically commit to Kafka the offsets of messages already returned by the consumer. This committed
- * offset will be used when the process fails as the position from which the consumption will begin.
+ * enable.auto.commit
*/
public static final String ENABLE_AUTO_COMMIT_CONFIG = "enable.auto.commit";
-
+ private static final String ENABLE_AUTO_COMMIT_DOC = "If true the consumer's offset will be periodically committed in the background.";
+
/**
- * The friendly name of the partition assignment strategy that the server will use to distribute partition ownership
- * amongst consumer instances when group management is used
+ * auto.commit.interval.ms
*/
- public static final String PARTITION_ASSIGNMENT_STRATEGY = "partition.assignment.strategy";
-
+ public static final String AUTO_COMMIT_INTERVAL_MS_CONFIG = "auto.commit.interval.ms";
+ private static final String AUTO_COMMIT_INTERVAL_MS_DOC = "The frequency in milliseconds that the consumer offsets are auto-committed to Kafka if enable.auto.commit
is set to true
.";
+
/**
- * The frequency in milliseconds that the consumer offsets are committed to Kafka. Relevant if {@link #ENABLE_AUTO_COMMIT_CONFIG}
- * is turned on.
+ * partition.assignment.strategy
*/
- public static final String AUTO_COMMIT_INTERVAL_MS_CONFIG = "auto.commit.interval.ms";
-
+ public static final String PARTITION_ASSIGNMENT_STRATEGY_CONFIG = "partition.assignment.strategy";
+ private static final String PARTITION_ASSIGNMENT_STRATEGY_DOC = "The friendly name of the partition assignment strategy that the server will use to distribute partition ownership amongst consumer instances when group management is used";
+
/**
- * What to do when there is no initial offset in Kafka or if an offset is out of range:
- *
- * - smallest: automatically reset the offset to the smallest offset
- *
- largest: automatically reset the offset to the largest offset
- *
- disable: throw exception to the consumer if no previous offset is found for the consumer's group
- *
- anything else: throw exception to the consumer.
- *
+ * auto.offset.reset
*/
public static final String AUTO_OFFSET_RESET_CONFIG = "auto.offset.reset";
-
+ private static final String AUTO_OFFSET_RESET_DOC = "What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted): - smallest: automatically reset the offset to the smallest offset
- largest: automatically reset the offset to the largest offset
- none: throw exception to the consumer if no previous offset is found for the consumer's group
- anything else: throw exception to the consumer.
";
+
/**
- * The minimum amount of data the server should return for a fetch request. If insufficient data is available the
- * request will wait for that much data to accumulate before answering the request.
+ * fetch.min.bytes
*/
public static final String FETCH_MIN_BYTES_CONFIG = "fetch.min.bytes";
-
+ private static final String FETCH_MIN_BYTES_DOC = "The minimum amount of data the server should return for a fetch request. If insufficient data is available the request will wait for that much data to accumulate before answering the request. The default setting of 1 byte means that fetch requests are answered as soon as a single byte of data is available or the fetch request times out waiting for data to arrive. Setting this to something greater than 1 will cause the server to wait for larger amounts of data to accumulate which can improve server throughput a bit at the cost of some additional latency.";
+
/**
- * The maximum amount of time the server will block before answering the fetch request if there isn't sufficient
- * data to immediately satisfy {@link #FETCH_MIN_BYTES_CONFIG}. This should be less than or equal to the timeout used in
- * {@link KafkaConsumer#poll(long) poll(timeout)}
+ * fetch.max.wait.ms
*/
public static final String FETCH_MAX_WAIT_MS_CONFIG = "fetch.max.wait.ms";
-
+ private static final String FETCH_MAX_WAIT_MS_DOC = "The maximum amount of time the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy the requirement given by fetch.min.bytes.";
+
+ /** metadata.max.age.ms
*/
+ public static final String METADATA_MAX_AGE_CONFIG = CommonClientConfigs.METADATA_MAX_AGE_CONFIG;
+
/**
- * The maximum amount of time to block waiting to fetch metadata about a topic the first time a record is received
- * from that topic. The consumer will throw a TimeoutException if it could not successfully fetch metadata within
- * this timeout.
+ * max.partition.fetch.bytes
*/
- public static final String METADATA_FETCH_TIMEOUT_CONFIG = "metadata.fetch.timeout.ms";
+ public static final String MAX_PARTITION_FETCH_BYTES_CONFIG = "max.partition.fetch.bytes";
+ private static final String MAX_PARTITION_FETCH_BYTES_DOC = "The maximum amount of data per-partition the server will return. The maximum total memory used for a request will be #partitions * max.partition.fetch.bytes
. This size must be at least as large as the maximum message size the server allows or else it is possible for the producer to send messages larger than the consumer can fetch. If that happens, the consumer can get stuck trying to fetch a large message on a certain partition.";
+
+ /** send.buffer.bytes
*/
+ public static final String SEND_BUFFER_CONFIG = CommonClientConfigs.SEND_BUFFER_CONFIG;
+
+ /** receive.buffer.bytes
*/
+ public static final String RECEIVE_BUFFER_CONFIG = CommonClientConfigs.RECEIVE_BUFFER_CONFIG;
/**
- * The total memory used by the consumer to buffer records received from the server. This config is meant to control
- * the consumer's memory usage, so it is the size of the global fetch buffer that will be shared across all partitions.
+ * client.id
*/
- public static final String TOTAL_BUFFER_MEMORY_CONFIG = "total.memory.bytes";
+ public static final String CLIENT_ID_CONFIG = CommonClientConfigs.CLIENT_ID_CONFIG;
/**
- * The minimum amount of memory that should be used to fetch at least one message for a partition. This puts a lower
- * bound on the consumer's memory utilization when there is at least one message for a partition available on the server.
- * This size must be at least as large as the maximum message size the server allows or else it is possible for the producer
- * to send messages larger than the consumer can fetch. If that happens, the consumer can get stuck trying to fetch a large
- * message on a certain partition.
+ * reconnect.backoff.ms
*/
- public static final String FETCH_BUFFER_CONFIG = "fetch.buffer.bytes";
-
+ public static final String RECONNECT_BACKOFF_MS_CONFIG = CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG;
+
/**
- * The id string to pass to the server when making requests. The purpose of this is to be able to track the source
- * of requests beyond just ip/port by allowing a logical application name to be included.
+ * retry.backoff.ms
*/
- public static final String CLIENT_ID_CONFIG = "client.id";
+ public static final String RETRY_BACKOFF_MS_CONFIG = CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG;
/**
- * The size of the TCP send buffer to use when fetching data
+ * metrics.sample.window.ms
*/
- public static final String SOCKET_RECEIVE_BUFFER_CONFIG = "socket.receive.buffer.bytes";
+ public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG;
/**
- * The amount of time to wait before attempting to reconnect to a given host. This avoids repeatedly connecting to a
- * host in a tight loop. This backoff applies to all requests sent by the consumer to the broker.
+ * metrics.num.samples
*/
- public static final String RECONNECT_BACKOFF_MS_CONFIG = "reconnect.backoff.ms";
+ public static final String METRICS_NUM_SAMPLES_CONFIG = CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG;
- /** metrics.sample.window.ms
*/
- public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = "metrics.sample.window.ms";
- private static final String METRICS_SAMPLE_WINDOW_MS_DOC = "The metrics system maintains a configurable number of samples over a fixed window size. This configuration " + "controls the size of the window. For example we might maintain two samples each measured over a 30 second period. "
- + "When a window expires we erase and overwrite the oldest window.";
-
- /** metrics.num.samples
*/
- public static final String METRICS_NUM_SAMPLES_CONFIG = "metrics.num.samples";
- private static final String METRICS_NUM_SAMPLES_DOC = "The number of samples maintained to compute metrics.";
+ /**
+ * metric.reporters
+ */
+ public static final String METRIC_REPORTER_CLASSES_CONFIG = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG;
- /** metric.reporters
*/
- public static final String METRIC_REPORTER_CLASSES_CONFIG = "metric.reporters";
- private static final String METRIC_REPORTER_CLASSES_DOC = "A list of classes to use as metrics reporters. Implementing the MetricReporter
interface allows " + "plugging in classes that will be notified of new metric creation. The JmxReporter is always included to register JMX statistics.";
+ /**
+ * rebalance.callback.class
+ */
+ public static final String CONSUMER_REBALANCE_CALLBACK_CLASS_CONFIG = "rebalance.callback.class";
+ private static final String CONSUMER_REBALANCE_CALLBACK_CLASS_DOC = "A user-provided callback to execute when partition assignments change.";
+ /**
+ * check.crcs
+ */
+ public static final String CHECK_CRCS_CONFIG = "check.crcs";
+ private static final String CHECK_CRCS_DOC = "Automatically check the CRC32 of the records consumed. This ensures no on-the-wire or on-disk corruption to the messages occurred. This check adds some overhead, so it may be disabled in cases seeking extreme performance.";
+
/** key.deserializer
*/
public static final String KEY_DESERIALIZER_CLASS_CONFIG = "key.deserializer";
private static final String KEY_DESERIALIZER_CLASS_DOC = "Deserializer class for key that implements the Deserializer
interface.";
@@ -160,38 +154,134 @@ public class ConsumerConfig extends AbstractConfig {
private static final String VALUE_DESERIALIZER_CLASS_DOC = "Deserializer class for value that implements the Deserializer
interface.";
static {
- /* TODO: add config docs */
- config = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Importance.HIGH, "blah blah")
- .define(GROUP_ID_CONFIG, Type.STRING, Importance.HIGH, "blah blah")
- .define(SESSION_TIMEOUT_MS, Type.LONG, 1000, Importance.HIGH, "blah blah")
- .define(HEARTBEAT_FREQUENCY, Type.INT, 3, Importance.MEDIUM, "blah blah")
- .define(PARTITION_ASSIGNMENT_STRATEGY, Type.STRING, Importance.MEDIUM, "blah blah")
- .define(METADATA_FETCH_TIMEOUT_CONFIG, Type.LONG, 60 * 1000, atLeast(0), Importance.MEDIUM, "blah blah")
- .define(ENABLE_AUTO_COMMIT_CONFIG, Type.BOOLEAN, true, Importance.MEDIUM, "blah blah")
- .define(AUTO_COMMIT_INTERVAL_MS_CONFIG, Type.LONG, 5000, atLeast(0), Importance.LOW, "blah blah")
- .define(CLIENT_ID_CONFIG, Type.STRING, "", Importance.LOW, "blah blah")
- .define(TOTAL_BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024 * 1024L, atLeast(0L), Importance.LOW, "blah blah")
- .define(FETCH_BUFFER_CONFIG, Type.INT, 1 * 1024 * 1024, atLeast(0), Importance.HIGH, "blah blah")
- .define(SOCKET_RECEIVE_BUFFER_CONFIG, Type.INT, 128 * 1024, atLeast(0), Importance.LOW, "blah blah")
- .define(FETCH_MIN_BYTES_CONFIG, Type.LONG, 1024, atLeast(0), Importance.HIGH, "blah blah")
- .define(FETCH_MAX_WAIT_MS_CONFIG, Type.LONG, 500, atLeast(0), Importance.LOW, "blah blah")
- .define(RECONNECT_BACKOFF_MS_CONFIG, Type.LONG, 10L, atLeast(0L), Importance.LOW, "blah blah")
- .define(AUTO_OFFSET_RESET_CONFIG, Type.STRING, "largest", Importance.MEDIUM, "blah blah")
+ config = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG,
+ Type.LIST,
+ Importance.HIGH,
+ CommonClientConfigs.BOOSTRAP_SERVERS_DOC)
+ .define(GROUP_ID_CONFIG, Type.STRING, "", Importance.HIGH, GROUP_ID_DOC)
+ .define(SESSION_TIMEOUT_MS_CONFIG,
+ Type.LONG,
+ 30000,
+ Importance.HIGH,
+ SESSION_TIMEOUT_MS_DOC)
+ .define(PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
+ Type.STRING,
+ "blah",
+ Importance.MEDIUM,
+ PARTITION_ASSIGNMENT_STRATEGY_DOC)
+ .define(METADATA_MAX_AGE_CONFIG,
+ Type.LONG,
+ 5 * 60 * 1000,
+ atLeast(0),
+ Importance.LOW,
+ CommonClientConfigs.METADATA_MAX_AGE_DOC)
+ .define(ENABLE_AUTO_COMMIT_CONFIG,
+ Type.BOOLEAN,
+ true,
+ Importance.MEDIUM,
+ ENABLE_AUTO_COMMIT_DOC)
+ .define(AUTO_COMMIT_INTERVAL_MS_CONFIG,
+ Type.LONG,
+ 5000,
+ atLeast(0),
+ Importance.LOW,
+ AUTO_COMMIT_INTERVAL_MS_DOC)
+ .define(CLIENT_ID_CONFIG,
+ Type.STRING,
+ "",
+ Importance.LOW,
+ CommonClientConfigs.CLIENT_ID_DOC)
+ .define(MAX_PARTITION_FETCH_BYTES_CONFIG,
+ Type.INT,
+ 1 * 1024 * 1024,
+ atLeast(0),
+ Importance.HIGH,
+ MAX_PARTITION_FETCH_BYTES_DOC)
+ .define(SEND_BUFFER_CONFIG,
+ Type.INT,
+ 128 * 1024,
+ atLeast(0),
+ Importance.MEDIUM,
+ CommonClientConfigs.SEND_BUFFER_DOC)
+ .define(RECEIVE_BUFFER_CONFIG,
+ Type.INT,
+ 32 * 1024,
+ atLeast(0),
+ Importance.MEDIUM,
+ CommonClientConfigs.RECEIVE_BUFFER_DOC)
+ .define(FETCH_MIN_BYTES_CONFIG,
+ Type.INT,
+ 1024,
+ atLeast(0),
+ Importance.HIGH,
+ FETCH_MIN_BYTES_DOC)
+ .define(FETCH_MAX_WAIT_MS_CONFIG,
+ Type.INT,
+ 500,
+ atLeast(0),
+ Importance.LOW,
+ FETCH_MAX_WAIT_MS_DOC)
+ .define(RECONNECT_BACKOFF_MS_CONFIG,
+ Type.LONG,
+ 50L,
+ atLeast(0L),
+ Importance.LOW,
+ CommonClientConfigs.RECONNECT_BACKOFF_MS_DOC)
+ .define(RETRY_BACKOFF_MS_CONFIG,
+ Type.LONG,
+ 100L,
+ atLeast(0L),
+ Importance.LOW,
+ CommonClientConfigs.RETRY_BACKOFF_MS_DOC)
+ .define(AUTO_OFFSET_RESET_CONFIG,
+ Type.STRING,
+ "latest",
+ in("latest", "earliest", "none"),
+ Importance.MEDIUM,
+ AUTO_OFFSET_RESET_DOC)
+ .define(CONSUMER_REBALANCE_CALLBACK_CLASS_CONFIG,
+ Type.CLASS,
+ NoOpConsumerRebalanceCallback.class,
+ Importance.LOW,
+ CONSUMER_REBALANCE_CALLBACK_CLASS_DOC)
+ .define(CHECK_CRCS_CONFIG,
+ Type.BOOLEAN,
+ true,
+ Importance.LOW,
+ CHECK_CRCS_DOC)
.define(METRICS_SAMPLE_WINDOW_MS_CONFIG,
Type.LONG,
30000,
atLeast(0),
Importance.LOW,
- METRICS_SAMPLE_WINDOW_MS_DOC)
- .define(METRICS_NUM_SAMPLES_CONFIG, Type.INT, 2, atLeast(1), Importance.LOW, METRICS_NUM_SAMPLES_DOC)
- .define(METRIC_REPORTER_CLASSES_CONFIG, Type.LIST, "", Importance.LOW, METRIC_REPORTER_CLASSES_DOC)
- .define(KEY_DESERIALIZER_CLASS_CONFIG, Type.CLASS, Importance.HIGH, KEY_DESERIALIZER_CLASS_DOC)
- .define(VALUE_DESERIALIZER_CLASS_CONFIG, Type.CLASS, Importance.HIGH, VALUE_DESERIALIZER_CLASS_DOC);
-
+ CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_DOC)
+ .define(METRICS_NUM_SAMPLES_CONFIG,
+ Type.INT,
+ 2,
+ atLeast(1),
+ Importance.LOW,
+ CommonClientConfigs.METRICS_NUM_SAMPLES_DOC)
+ .define(METRIC_REPORTER_CLASSES_CONFIG,
+ Type.LIST,
+ "",
+ Importance.LOW,
+ CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC)
+ .define(KEY_DESERIALIZER_CLASS_CONFIG,
+ Type.CLASS,
+ Importance.HIGH,
+ KEY_DESERIALIZER_CLASS_DOC)
+ .define(VALUE_DESERIALIZER_CLASS_CONFIG,
+ Type.CLASS,
+ Importance.HIGH,
+ VALUE_DESERIALIZER_CLASS_DOC);
}
ConsumerConfig(Map extends Object, ? extends Object> props) {
super(config, props);
}
+ public static void main(String[] args) {
+ System.out.println(config.toHtmlTable());
+ }
+
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java
index e4cf7d1cfa01..74dfdba0ecbc 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java
@@ -9,7 +9,7 @@
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
-*/
+ */
package org.apache.kafka.clients.consumer;
import java.util.Collection;
@@ -17,34 +17,77 @@
import org.apache.kafka.common.TopicPartition;
/**
- * A callback interface that the user can implement to manage customized offsets on the start and end of
- * every rebalance operation. This callback will execute in the user thread as part of the
- * {@link Consumer#poll(long) poll(long)} API on every rebalance attempt.
- * Default implementation of the callback will {@link Consumer#seek(java.util.Map) seek(offsets)} to the last committed offsets in the
- * {@link #onPartitionsAssigned(Consumer, Collection) onPartitionsAssigned()} callback. And will commit offsets synchronously
- * for the specified list of partitions to Kafka in the {@link #onPartitionsRevoked(Consumer, Collection) onPartitionsRevoked()}
- * callback.
+ * A callback interface that the user can implement to trigger custom actions when the set of partitions assigned to the
+ * consumer changes.
+ *
+ * This is applicable when the consumer is having Kafka auto-manage group membership, if the consumer's directly subscribe to partitions
+ * those partitions will never be reassigned and this callback is not applicable.
+ *
+ * When Kafka is managing the group membership, a partition re-assignment will be triggered any time the members of the group changes or the subscription
+ * of the members changes. This can occur when processes die, new process instances are added or old instances come back to life after failure.
+ *
+ * There are many uses for this functionality. One common use is saving offsets in a custom store. By saving offsets in
+ * the {@link #onPartitionsRevoked(Consumer, Collection)} call we can ensure that any time partition assignment changes
+ * the offset gets saved.
+ *
+ * Another use is flushing out any kind of cache of intermediate results the consumer may be keeping. For example,
+ * consider a case where the consumer is subscribed to a topic containing user page views, and the goal is to count the
+ * number of page views per users for each five minute window. Let's say the topic is partitioned by the user id so that
+ * all events for a particular user will go to a single consumer instance. The consumer can keep in memory a running
+ * tally of actions per user and only flush these out to a remote data store when it's cache gets to big. However if a
+ * partition is reassigned it may want to automatically trigger a flush of this cache, before the new owner takes over
+ * consumption.
+ *
+ * This callback will execute in the user thread as part of the {@link Consumer#poll(long) poll(long)} call whenever partition assignment changes.
+ *
+ * It is guaranteed that all consumer processes will invoke {@link #onPartitionsRevoked(Consumer, Collection) onPartitionsRevoked} prior to
+ * any process invoking {@link #onPartitionsAssigned(Consumer, Collection) onPartitionsAssigned}. So if offsets or other state is saved in the
+ * {@link #onPartitionsRevoked(Consumer, Collection) onPartitionsRevoked} call it is guaranteed to be saved by the time the process taking over that
+ * partition has their {@link #onPartitionsAssigned(Consumer, Collection) onPartitionsAssigned} callback called to load the state.
+ *
+ * Here is pseudo-code for a callback implementation for saving offsets:
+ *
+ * {@code
+ * public class SaveOffsetsOnRebalance implements ConsumerRebalanceCallback {
+ * public void onPartitionsAssigned(Consumer, ?> consumer, Collection partitions) {
+ * // read the offsets from an external store using some custom code not described here
+ * for(TopicPartition partition: partitions)
+ * consumer.position(partition, readOffsetFromExternalStore(partition));
+ * }
+ * public void onPartitionsRevoked(Consumer, ?> consumer, Collection partitions) {
+ * // save the offsets in an external store using some custom code not described here
+ * for(TopicPartition partition: partitions)
+ * saveOffsetInExternalStore(consumer.position(partition));
+ * }
+ * }
+ * }
+ *
*/
public interface ConsumerRebalanceCallback {
/**
- * A callback method the user can implement to provide handling of customized offsets on completion of a successful
- * rebalance operation. This method will be called after a rebalance operation completes and before the consumer
- * starts fetching data.
- *
- * For examples on usage of this API, see Usage Examples section of {@link KafkaConsumer KafkaConsumer}
- * @param partitions The list of partitions that are assigned to the consumer after rebalance
+ * A callback method the user can implement to provide handling of customized offsets on completion of a successful
+ * partition re-assignement. This method will be called after an offset re-assignement completes and before the
+ * consumer starts fetching data.
+ *
+ * It is guaranteed that all the processes in a consumer group will execute their
+ * {@link #onPartitionsRevoked(Consumer, Collection)} callback before any instance executes its
+ * {@link #onPartitionsAssigned(Consumer, Collection)} callback.
+ *
+ * @param partitions The list of partitions that are now assigned to the consumer (may include partitions previously
+ * assigned to the consumer)
*/
- public void onPartitionsAssigned(Consumer,?> consumer, Collection partitions);
-
+ public void onPartitionsAssigned(Consumer, ?> consumer, Collection partitions);
+
/**
- * A callback method the user can implement to provide handling of offset commits to a customized store on the
- * start of a rebalance operation. This method will be called before a rebalance operation starts and after the
- * consumer stops fetching data. It is recommended that offsets should be committed in this callback to
- * either Kafka or a custom offset store to prevent duplicate data
- *
- * For examples on usage of this API, see Usage Examples section of {@link KafkaConsumer KafkaConsumer}
+ * A callback method the user can implement to provide handling of offset commits to a customized store on the start
+ * of a rebalance operation. This method will be called before a rebalance operation starts and after the consumer
+ * stops fetching data. It is recommended that offsets should be committed in this callback to either Kafka or a
+ * custom offset store to prevent duplicate data
+ *
+ * For examples on usage of this API, see Usage Examples section of {@link KafkaConsumer KafkaConsumer}
+ *
* @param partitions The list of partitions that were assigned to the consumer on the last rebalance
*/
- public void onPartitionsRevoked(Consumer,?> consumer, Collection partitions);
+ public void onPartitionsRevoked(Consumer, ?> consumer, Collection partitions);
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java
index 16af70a5de52..466254e81c32 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java
@@ -9,119 +9,76 @@
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
-*/
+ */
package org.apache.kafka.clients.consumer;
-import org.apache.kafka.common.TopicPartition;
-
/**
- * A key/value pair to be received from Kafka. This consists of a topic name and a partition number, from which the
- * record is being received and an offset that points to the record in a Kafka partition.
+ * A key/value pair to be received from Kafka. This consists of a topic name and a partition number, from which the
+ * record is being received and an offset that points to the record in a Kafka partition.
*/
-public final class ConsumerRecord {
- private final TopicPartition partition;
+public final class ConsumerRecord {
+ private final String topic;
+ private final int partition;
+ private final long offset;
private final K key;
private final V value;
- private final long offset;
- private volatile Exception error;
-
- /**
- * Creates a record to be received from a specified topic and partition
- *
- * @param topic The topic this record is received from
- * @param partitionId The partition of the topic this record is received from
- * @param key The key of the record, if one exists
- * @param value The record contents
- * @param offset The offset of this record in the corresponding Kafka partition
- */
- public ConsumerRecord(String topic, int partitionId, K key, V value, long offset) {
- this(topic, partitionId, key, value, offset, null);
- }
/**
* Create a record with no key
*
* @param topic The topic this record is received from
- * @param partitionId The partition of the topic this record is received from
- * @param value The record contents
+ * @param partition The partition of the topic this record is received from
* @param offset The offset of this record in the corresponding Kafka partition
+ * @param value The record contents
*/
- public ConsumerRecord(String topic, int partitionId, V value, long offset) {
- this(topic, partitionId, null, value, offset);
- }
-
- /**
- * Creates a record with an error code
- * @param topic The topic this record is received from
- * @param partitionId The partition of the topic this record is received from
- * @param error The exception corresponding to the error code returned by the server for this topic partition
- */
- public ConsumerRecord(String topic, int partitionId, Exception error) {
- this(topic, partitionId, null, null, -1L, error);
- }
-
- private ConsumerRecord(String topic, int partitionId, K key, V value, long offset, Exception error) {
+ public ConsumerRecord(String topic, int partition, long offset, K key, V value) {
if (topic == null)
throw new IllegalArgumentException("Topic cannot be null");
- this.partition = new TopicPartition(topic, partitionId);
+ this.topic = topic;
+ this.partition = partition;
+ this.offset = offset;
this.key = key;
this.value = value;
- this.offset = offset;
- this.error = error;
}
-
+
/**
* The topic this record is received from
*/
public String topic() {
- return partition.topic();
+ return this.topic;
}
/**
- * The partition from which this record is received
+ * The partition from which this record is received
*/
public int partition() {
- return partition.partition();
+ return this.partition;
}
-
- /**
- * The TopicPartition object containing the topic and partition
- */
- public TopicPartition topicAndPartition() {
- return partition;
- }
-
+
/**
* The key (or null if no key is specified)
- * @throws Exception The exception thrown while fetching this record.
*/
public K key() throws Exception {
- if (this.error != null)
- throw this.error;
return key;
}
/**
* The value
- * @throws Exception The exception thrown while fetching this record.
*/
public V value() throws Exception {
- if (this.error != null)
- throw this.error;
return value;
}
/**
* The position of this record in the corresponding Kafka partition.
- * @throws Exception The exception thrown while fetching this record.
*/
- public long offset() throws Exception {
- if (this.error != null)
- throw this.error;
+ public long offset() {
return offset;
}
- public Exception error() {
- return this.error;
+ @Override
+ public String toString() {
+ return "ConsumerRecord(topic = " + topic() + ", partition = " + partition() + ", offset = " + offset()
+ + ", key = " + key + ", value = " + value + ")";
}
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
index bdf4b26942d5..416d703c3f59 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
@@ -9,53 +9,98 @@
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
-*/
+ */
package org.apache.kafka.clients.consumer;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.AbstractIterator;
/**
- * A container that holds the list {@link ConsumerRecord} per partition for a particular topic. There is one for every topic returned by a
- * {@link Consumer#poll(long)} operation.
+ * A container that holds the list {@link ConsumerRecord} per partition for a
+ * particular topic. There is one for every topic returned by a
+ * {@link Consumer#poll(long)} operation.
*/
-public class ConsumerRecords {
+public class ConsumerRecords implements Iterable> {
- private final String topic;
- private final Map>> recordsPerPartition;
-
- public ConsumerRecords(String topic, Map>> records) {
- this.topic = topic;
- this.recordsPerPartition = records;
+ private final Map>> records;
+
+ public ConsumerRecords(Map>> records) {
+ this.records = records;
}
-
+
/**
- * @param partitions The input list of partitions for a particular topic. If no partitions are
- * specified, returns records for all partitions
- * @return The list of {@link ConsumerRecord}s associated with the given partitions.
+ * Get just the records for the given partition
+ *
+ * @param partition The partition to get records for
*/
- public List> records(int... partitions) {
- List> recordsToReturn = new ArrayList>();
- if(partitions.length == 0) {
- // return records for all partitions
- for(Entry>> record : recordsPerPartition.entrySet()) {
- recordsToReturn.addAll(record.getValue());
- }
- } else {
- for(int partition : partitions) {
- List> recordsForThisPartition = recordsPerPartition.get(partition);
- recordsToReturn.addAll(recordsForThisPartition);
- }
+ public Iterable> records(TopicPartition partition) {
+ List> recs = this.records.get(partition);
+ if (recs == null)
+ return Collections.emptyList();
+ else
+ return recs;
+ }
+
+ /**
+ * Get just the records for the given topic
+ */
+ public Iterable> records(String topic) {
+ if (topic == null)
+ throw new IllegalArgumentException("Topic must be non-null.");
+ List>> recs = new ArrayList>>();
+ for (Map.Entry>> entry : records.entrySet()) {
+ if (entry.getKey().equals(topic))
+ recs.add(entry.getValue());
}
- return recordsToReturn;
+ return new ConcatenatedIterable(recs);
}
+ @Override
+ public Iterator> iterator() {
+ return new ConcatenatedIterable(records.values()).iterator();
+ }
+
/**
- * @return The topic of all records associated with this instance
+ * The number of records for all topics
*/
- public String topic() {
- return this.topic;
+ public int count() {
+ int count = 0;
+ for(List> recs: this.records.values())
+ count += recs.size();
+ return count;
+ }
+
+ private static class ConcatenatedIterable implements Iterable> {
+
+ private final Iterable extends Iterable>> iterables;
+
+ public ConcatenatedIterable(Iterable extends Iterable>> iterables) {
+ this.iterables = iterables;
+ }
+
+ @Override
+ public Iterator> iterator() {
+ return new AbstractIterator>() {
+ Iterator extends Iterable>> iters = iterables.iterator();
+ Iterator> current;
+
+ public ConsumerRecord makeNext() {
+ if (current == null || !current.hasNext()) {
+ if (iters.hasNext())
+ current = iters.next().iterator();
+ else
+ return allDone();
+ }
+ return current.next();
+ }
+ };
+ }
}
+
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 76efc216c9e6..300c551f3d21 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -9,380 +9,447 @@
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
-*/
+ */
package org.apache.kafka.clients.consumer;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.ConnectionState;
+import org.apache.kafka.clients.NetworkClient;
+import org.apache.kafka.clients.RequestCompletionHandler;
+import org.apache.kafka.clients.consumer.internals.Heartbeat;
+import org.apache.kafka.clients.consumer.internals.SubscriptionState;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.internals.Metadata;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.JmxReporter;
+import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.Count;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.network.Selector;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.record.LogEntry;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.requests.ConsumerMetadataRequest;
+import org.apache.kafka.common.requests.ConsumerMetadataResponse;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.requests.FetchRequest.PartitionData;
+import org.apache.kafka.common.requests.FetchResponse;
+import org.apache.kafka.common.requests.HeartbeatRequest;
+import org.apache.kafka.common.requests.HeartbeatResponse;
+import org.apache.kafka.common.requests.JoinGroupRequest;
+import org.apache.kafka.common.requests.JoinGroupResponse;
+import org.apache.kafka.common.requests.ListOffsetRequest;
+import org.apache.kafka.common.requests.ListOffsetResponse;
+import org.apache.kafka.common.requests.OffsetCommitRequest;
+import org.apache.kafka.common.requests.OffsetCommitResponse;
+import org.apache.kafka.common.requests.OffsetFetchRequest;
+import org.apache.kafka.common.requests.OffsetFetchResponse;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.requests.RequestSend;
import org.apache.kafka.common.utils.ClientUtils;
import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.net.InetSocketAddress;
-import java.util.*;
-
/**
* A Kafka client that consumes records from a Kafka cluster.
- *
- * The consumer is thread safe and should generally be shared among all threads for best performance.
*
- * The consumer is single threaded and multiplexes I/O over TCP connections to each of the brokers it
- * needs to communicate with. Failure to close the consumer after use will leak these resources.
+ * It will transparently handle the failure of servers in the Kafka cluster, and transparently adapt as partitions of
+ * data it subscribes to migrate within the cluster. This client also interacts with the server to allow groups of
+ * consumers to load balance consumption using consumer groups (as described below).
+ *
+ * The consumer maintains TCP connections to the necessary brokers to fetch data for the topics it subscribes to.
+ * Failure to close the consumer after use will leak these connections.
+ *
+ * The consumer is thread safe but generally will be used only from within a single thread. The consumer client has no
+ * threads of it's own, all work is done in the caller's thread when calls are made on the various methods exposed.
+ *
+ *
Offsets and Consumer Position
+ * Kafka maintains a numerical offset for each record in a partition. This offset acts as a kind of unique identifier of
+ * a record within that partition, and also denotes the position of the consumer in the partition. That is, a consumer
+ * which has position 5 has consumed records with offsets 0 through 4 and will next receive record with offset 5. There
+ * are actually two notions of position relevant to the user of the consumer.
+ *
+ * The {@link #position(TopicPartition) position} of the consumer gives the offset of the next record that will be given
+ * out. It will be one larger than the highest offset the consumer has seen in that partition. It automatically advances
+ * every time the consumer receives data calls {@link #poll(long)} and receives messages.
+ *
+ * The {@link #commit(CommitType) committed position} is the last offset that has been saved securely. Should the
+ * process fail and restart, this is the offset that it will recover to. The consumer can either automatically commit
+ * offsets periodically, or it can choose to control this committed position manually by calling
+ * {@link #commit(CommitType) commit}.
+ *
+ * This distinction gives the consumer control over when a record is considered consumed. It is discussed in further
+ * detail below.
+ *
+ *
Consumer Groups
+ *
+ * Kafka uses the concept of consumer groups to allow a pool of processes to divide up the work of consuming and
+ * processing records. These processes can either be running on the same machine or, as is more likely, they can be
+ * distributed over many machines to provide additional scalability and fault tolerance for processing.
+ *
+ * Each Kafka consumer must specify a consumer group that it belongs to. Kafka will deliver each message in the
+ * subscribed topics to one process in each consumer group. This is achieved by balancing the partitions in the topic
+ * over the consumer processes in each group. So if there is a topic with four partitions, and a consumer group with two
+ * processes, each process would consume from two partitions. This group membership is maintained dynamically: if a
+ * process fails the partitions assigned to it will be reassigned to other processes in the same group, and if a new
+ * process joins the group, partitions will be moved from existing consumers to this new process.
+ *
+ * So if two processes subscribe to a topic both specifying different groups they will each get all the records in that
+ * topic; if they both specify the same group they will each get about half the records.
+ *
+ * Conceptually you can think of a consumer group as being a single logical subscriber that happens to be made up of
+ * multiple processes. As a multi-subscriber system, Kafka naturally supports having any number of consumer groups for a
+ * given topic without duplicating data (additional consumers are actually quite cheap).
+ *
+ * This is a slight generalization of the functionality that is common in messaging systems. To get semantics similar to
+ * a queue in a traditional messaging system all processes would be part of a single consumer group and hence record
+ * delivery would be balanced over the group like with a queue. Unlike a traditional messaging system, though, you can
+ * have multiple such groups. To get semantics similar to pub-sub in a traditional messaging system each process would
+ * have it's own consumer group, so each process would subscribe to all the records published to the topic.
+ *
+ * In addition, when offsets are committed they are always committed for a given consumer group.
+ *
+ * It is also possible for the consumer to manually specify the partitions it subscribes to, which disables this dynamic
+ * partition balancing.
+ *
*
Usage Examples
- * The consumer APIs offer flexibility to cover a variety of consumption use cases. Following are some examples to demonstrate the correct use of
- * the available APIs. Each of the examples assumes the presence of a user implemented process() method that processes a given batch of messages
- * and returns the offset of the latest processed message per partition. Note that process() is not part of the consumer API and is only used as
- * a convenience method to demonstrate the different use cases of the consumer APIs. Here is a sample implementation of such a process() method.
+ * The consumer APIs offer flexibility to cover a variety of consumption use cases. Here are some examples to
+ * demonstrate how to use them.
+ *
+ * Simple Processing
+ * This example demonstrates the simplest usage of Kafka's consumer api.
+ *
*
- * {@code
- * private Map process(Map records) {
- * Map processedOffsets = new HashMap();
- * for(Entry> recordMetadata : records.entrySet()) {
- * List> recordsPerTopic = recordMetadata.getValue().records();
- * for(int i = 0;i < recordsPerTopic.size();i++) {
- * ConsumerRecord record = recordsPerTopic.get(i);
- * // process record
- * try {
- * processedOffsets.put(record.topicAndpartition(), record.offset());
- * } catch (Exception e) {
- * e.printStackTrace();
- * }
- * }
+ * Properties props = new Properties();
+ * props.put("metadata.broker.list", "localhost:9092");
+ * props.put("group.id", "test");
+ * props.put("enable.auto.commit", "true");
+ * props.put("auto.commit.interval.ms", "1000");
+ * props.put("session.timeout.ms", "30000");
+ * props.put("key.serializer", "org.apache.kafka.common.serializers.StringSerializer");
+ * props.put("value.serializer", "org.apache.kafka.common.serializers.StringSerializer");
+ * KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
+ * consumer.subscribe("foo", "bar");
+ * while (true) {
+ * ConsumerRecords<String, String> records = consumer.poll(100);
+ * for (ConsumerRecord<String, String> record : records)
+ * System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
* }
- * return processedOffsets;
- * }
- * }
*
+ *
+ * Setting enable.auto.commit
means that offsets are committed automatically with a frequency controlled by
+ * the config auto.commit.interval.ms
.
*
- * This example demonstrates how the consumer can be used to leverage Kafka's group management functionality for automatic consumer load
- * balancing and failover. This example assumes that the offsets are stored in Kafka and are automatically committed periodically,
- * as controlled by the auto.commit.interval.ms config
- *
- * {@code
- * Properties props = new Properties();
- * props.put("metadata.broker.list", "localhost:9092");
- * props.put("group.id", "test");
- * props.put("session.timeout.ms", "1000");
- * props.put("enable.auto.commit", "true");
- * props.put("auto.commit.interval.ms", "10000");
- * KafkaConsumer consumer = new KafkaConsumer(props);
- * consumer.subscribe("foo", "bar");
- * boolean isRunning = true;
- * while(isRunning) {
- * Map> records = consumer.poll(100);
- * process(records);
- * }
- * consumer.close();
- * }
- *
- * This example demonstrates how the consumer can be used to leverage Kafka's group management functionality for automatic consumer load
- * balancing and failover. This example assumes that the offsets are stored in Kafka and are manually committed using
- * the commit(boolean) API. This example also demonstrates rewinding the consumer's offsets if processing of the consumed
- * messages fails. Note that this method of rewinding offsets using {@link #seek(Map) seek(offsets)} is only useful for rewinding the offsets
- * of the current consumer instance. As such, this will not trigger a rebalance or affect the fetch offsets for the other consumer instances.
+ * The connection to the cluster is bootstrapped by specifying a list of one or more brokers to contact using the
+ * configuration metadata.broker.list
. This list is just used to discover the rest of the brokers in the
+ * cluster and need not be an exhaustive list of servers in the cluster (though you may want to specify more than one in
+ * case there are servers down when the client is connecting).
+ *
+ * In this example the client is subscribing to the topics foo and bar as part of a group of consumers
+ * called test as described above.
+ *
+ * The broker will automatically detect failed processes in the test group by using a heartbeat mechanism. The
+ * consumer will automatically ping the cluster periodically, which let's the cluster know that it is alive. As long as
+ * the consumer is able to do this it is considered alive and retains the right to consume from the partitions assigned
+ * to it. If it stops heartbeating for a period of time longer than session.timeout.ms
then it will be
+ * considered dead and it's partitions will be assigned to another process.
+ *
+ * The serializers settings specify how to turn the objects the user provides into bytes. By specifying the string
+ * serializers we are saying that our record's key and value will just be simple strings.
+ *
+ *
Controlling When Messages Are Considered Consumed
+ *
+ * In this example we will consume a batch of records and batch them up in memory, when we have sufficient records
+ * batched we will insert them into a database. If we allowed offsets to auto commit as in the previous example messages
+ * would be considered consumed after they were given out by the consumer, and it would be possible that our process
+ * could fail after we have read messages into our in-memory buffer but before they had been inserted into the database.
+ * To avoid this we will manually commit the offsets only once the corresponding messages have been inserted into the
+ * database. This gives us exact control of when a message is considered consumed. This raises the opposite possibility:
+ * the process could fail in the interval after the insert into the database but before the commit (even though this
+ * would likely just be a few milliseconds, it is a possibility). In this case the process that took over consumption
+ * would consume from last committed offset and would repeat the insert of the last batch of data. Used in this way
+ * Kafka provides what is often called "at-least once delivery" guarantees, as each message will likely be delivered one
+ * time but in failure cases could be duplicated.
+ *
*
- * {@code
- * Properties props = new Properties();
- * props.put("metadata.broker.list", "localhost:9092");
- * props.put("group.id", "test");
- * props.put("session.timeout.ms", "1000");
- * props.put("enable.auto.commit", "false");
- * KafkaConsumer consumer = new KafkaConsumer(props);
- * consumer.subscribe("foo", "bar");
- * int commitInterval = 100;
- * int numRecords = 0;
- * boolean isRunning = true;
- * Map consumedOffsets = new HashMap();
- * while(isRunning) {
- * Map> records = consumer.poll(100);
- * try {
- * Map lastConsumedOffsets = process(records);
- * consumedOffsets.putAll(lastConsumedOffsets);
- * numRecords += records.size();
- * // commit offsets for all partitions of topics foo, bar synchronously, owned by this consumer instance
- * if(numRecords % commitInterval == 0)
- * consumer.commit(false);
- * } catch(Exception e) {
- * try {
- * // rewind consumer's offsets for failed partitions
- * // assume failedPartitions() returns the list of partitions for which the processing of the last batch of messages failed
- * List failedPartitions = failedPartitions();
- * Map offsetsToRewindTo = new HashMap();
- * for(TopicPartition failedPartition : failedPartitions) {
- * // rewind to the last consumed offset for the failed partition. Since process() failed for this partition, the consumed offset
- * // should still be pointing to the last successfully processed offset and hence is the right offset to rewind consumption to.
- * offsetsToRewindTo.put(failedPartition, consumedOffsets.get(failedPartition));
+ * Properties props = new Properties();
+ * props.put("metadata.broker.list", "localhost:9092");
+ * props.put("group.id", "test");
+ * props.put("enable.auto.commit", "false");
+ * props.put("auto.commit.interval.ms", "1000");
+ * props.put("session.timeout.ms", "30000");
+ * props.put("key.serializer", "org.apache.kafka.common.serializers.StringSerializer");
+ * props.put("value.serializer", "org.apache.kafka.common.serializers.StringSerializer");
+ * KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
+ * consumer.subscribe("foo", "bar");
+ * int commitInterval = 200;
+ * List<ConsumerRecord<String, String>> buffer = new ArrayList<ConsumerRecord<String, String>>();
+ * while (true) {
+ * ConsumerRecords<String, String> records = consumer.poll(100);
+ * for (ConsumerRecord<String, String> record : records) {
+ * buffer.add(record);
+ * if (buffer.size() >= commitInterval) {
+ * insertIntoDb(buffer);
+ * consumer.commit(CommitType.SYNC);
+ * buffer.clear();
* }
- * // seek to new offsets only for partitions that failed the last process()
- * consumer.seek(offsetsToRewindTo);
- * } catch(Exception e) { break; } // rewind failed
+ * }
* }
- * }
- * consumer.close();
- * }
*
+ *
+ * Subscribing To Specific Partitions
+ *
+ * In the previous examples we subscribed to the topics we were interested in and let Kafka give our particular process
+ * a fair share of the partitions for those topics. This provides a simple load balancing mechanism so multiple
+ * instances of our program can divided up the work of processing records.
*
- * This example demonstrates how to rewind the offsets of the entire consumer group. It is assumed that the user has chosen to use Kafka's
- * group management functionality for automatic consumer load balancing and failover. This example also assumes that the offsets are stored in
- * Kafka. If group management is used, the right place to systematically rewind offsets for every consumer instance is inside the
- * ConsumerRebalanceCallback. The onPartitionsAssigned callback is invoked after the consumer is assigned a new set of partitions on rebalance
- * and before the consumption restarts post rebalance. This is the right place to supply the newly rewound offsets to the consumer. It
- * is recommended that if you foresee the requirement to ever reset the consumer's offsets in the presence of group management, that you
- * always configure the consumer to use the ConsumerRebalanceCallback with a flag that protects whether or not the offset rewind logic is used.
- * This method of rewinding offsets is useful if you notice an issue with your message processing after successful consumption and offset commit.
- * And you would like to rewind the offsets for the entire consumer group as part of rolling out a fix to your processing logic. In this case,
- * you would configure each of your consumer instances with the offset rewind configuration flag turned on and bounce each consumer instance
- * in a rolling restart fashion. Each restart will trigger a rebalance and eventually all consumer instances would have rewound the offsets for
- * the partitions they own, effectively rewinding the offsets for the entire consumer group.
- *
- * {@code
- * Properties props = new Properties();
- * props.put("metadata.broker.list", "localhost:9092");
- * props.put("group.id", "test");
- * props.put("session.timeout.ms", "1000");
- * props.put("enable.auto.commit", "false");
- * KafkaConsumer consumer = new KafkaConsumer(
- * props,
- * new ConsumerRebalanceCallback() {
- * boolean rewindOffsets = true; // should be retrieved from external application config
- * public void onPartitionsAssigned(Consumer, ?> consumer, Collection partitions) {
- * Map latestCommittedOffsets = consumer.committed(partitions);
- * if(rewindOffsets)
- * Map newOffsets = rewindOffsets(latestCommittedOffsets, 100);
- * consumer.seek(newOffsets);
- * }
- * public void onPartitionsRevoked(Consumer, ?> consumer, Collection partitions) {
- * consumer.commit(true);
- * }
- * // this API rewinds every partition back by numberOfMessagesToRewindBackTo messages
- * private Map rewindOffsets(Map currentOffsets,
- * long numberOfMessagesToRewindBackTo) {
- * Map newOffsets = new HashMap();
- * for(Map.Entry offset : currentOffsets.entrySet())
- * newOffsets.put(offset.getKey(), offset.getValue() - numberOfMessagesToRewindBackTo);
- * return newOffsets;
- * }
- * });
- * consumer.subscribe("foo", "bar");
- * int commitInterval = 100;
- * int numRecords = 0;
- * boolean isRunning = true;
- * Map consumedOffsets = new HashMap();
- * while(isRunning) {
- * Map> records = consumer.poll(100);
- * Map lastConsumedOffsets = process(records);
- * consumedOffsets.putAll(lastConsumedOffsets);
- * numRecords += records.size();
- * // commit offsets for all partitions of topics foo, bar synchronously, owned by this consumer instance
- * if(numRecords % commitInterval == 0)
- * consumer.commit(consumedOffsets, true);
- * }
- * consumer.commit(true);
- * consumer.close();
- * }
- *
- * This example demonstrates how the consumer can be used to leverage Kafka's group management functionality along with custom offset storage.
- * In this example, the assumption made is that the user chooses to store the consumer offsets outside Kafka. This requires the user to
- * plugin logic for retrieving the offsets from a custom store and provide the offsets to the consumer in the ConsumerRebalanceCallback
- * callback. The onPartitionsAssigned callback is invoked after the consumer is assigned a new set of partitions on rebalance and
- * before the consumption restarts post rebalance. This is the right place to supply offsets from a custom store to the consumer.
+ * In this mode the consumer will just get the partitions it subscribes to and if the consumer instance fails no attempt
+ * will be made to rebalance partitions to other instances.
*
- * Similarly, the user would also be required to plugin logic for storing the consumer's offsets to a custom store. The onPartitionsRevoked
- * callback is invoked right after the consumer has stopped fetching data and before the partition ownership changes. This is the right place
- * to commit the offsets for the current set of partitions owned by the consumer.
- *
- * {@code
- * Properties props = new Properties();
- * props.put("metadata.broker.list", "localhost:9092");
- * props.put("group.id", "test");
- * props.put("session.timeout.ms", "1000");
- * props.put("enable.auto.commit", "false"); // since enable.auto.commit only applies to Kafka based offset storage
- * KafkaConsumer consumer = new KafkaConsumer(
- * props,
- * new ConsumerRebalanceCallback() {
- * public void onPartitionsAssigned(Consumer,?> consumer, Collection partitions) {
- * Map lastCommittedOffsets = getLastCommittedOffsetsFromCustomStore(partitions);
- * consumer.seek(lastCommittedOffsets);
- * }
- * public void onPartitionsRevoked(Consumer,?> consumer, Collection partitions) {
- * Map offsets = getLastConsumedOffsets(partitions);
- * commitOffsetsToCustomStore(offsets);
- * }
- * // following APIs should be implemented by the user for custom offset management
- * private Map getLastCommittedOffsetsFromCustomStore(Collection partitions) {
- * return null;
- * }
- * private Map getLastConsumedOffsets(Collection partitions) { return null; }
- * private void commitOffsetsToCustomStore(Map offsets) {}
- * });
- * consumer.subscribe("foo", "bar");
- * int commitInterval = 100;
- * int numRecords = 0;
- * boolean isRunning = true;
- * Map consumedOffsets = new HashMap();
- * while(isRunning) {
- * Map> records = consumer.poll(100);
- * Map lastConsumedOffsets = process(records);
- * consumedOffsets.putAll(lastConsumedOffsets);
- * numRecords += records.size();
- * // commit offsets for all partitions of topics foo, bar synchronously, owned by this consumer instance
- * if(numRecords % commitInterval == 0)
- * commitOffsetsToCustomStore(consumedOffsets);
- * }
- * consumer.commit(true);
- * consumer.close();
- * }
- *
- * This example demonstrates how the consumer can be used to subscribe to specific partitions of certain topics and consume upto the latest
- * available message for each of those partitions before shutting down. When used to subscribe to specific partitions, the user foregoes
- * the group management functionality and instead relies on manually configuring the consumer instances to subscribe to a set of partitions.
- * This example assumes that the user chooses to use Kafka based offset storage. The user still has to specify a group.id to use Kafka
- * based offset management. However, session.timeout.ms is not required since the Kafka consumer only does automatic failover when group
- * management is used.
- *
- * {@code
- * Properties props = new Properties();
- * props.put("metadata.broker.list", "localhost:9092");
- * props.put("group.id", "test");
- * props.put("enable.auto.commit", "true");
- * props.put("auto.commit.interval.ms", "10000");
- * KafkaConsumer consumer = new KafkaConsumer(props);
- * // subscribe to some partitions of topic foo
- * TopicPartition partition0 = new TopicPartition("foo", 0);
- * TopicPartition partition1 = new TopicPartition("foo", 1);
- * TopicPartition[] partitions = new TopicPartition[2];
- * partitions[0] = partition0;
- * partitions[1] = partition1;
- * consumer.subscribe(partitions);
- * // find the last committed offsets for partitions 0,1 of topic foo
- * Map lastCommittedOffsets = consumer.committed(Arrays.asList(partitions));
- * // seek to the last committed offsets to avoid duplicates
- * consumer.seek(lastCommittedOffsets);
- * // find the offsets of the latest available messages to know where to stop consumption
- * Map latestAvailableOffsets = consumer.offsetsBeforeTime(-2, Arrays.asList(partitions));
- * boolean isRunning = true;
- * Map consumedOffsets = new HashMap();
- * while(isRunning) {
- * Map> records = consumer.poll(100);
- * Map lastConsumedOffsets = process(records);
- * consumedOffsets.putAll(lastConsumedOffsets);
- * for(TopicPartition partition : partitions) {
- * if(consumedOffsets.get(partition) >= latestAvailableOffsets.get(partition))
- * isRunning = false;
- * else
- * isRunning = true;
- * }
- * }
- * consumer.commit(true);
- * consumer.close();
- * }
- *
- * This example demonstrates how the consumer can be used to subscribe to specific partitions of certain topics and consume upto the latest
- * available message for each of those partitions before shutting down. When used to subscribe to specific partitions, the user foregoes
- * the group management functionality and instead relies on manually configuring the consumer instances to subscribe to a set of partitions.
- * This example assumes that the user chooses to use custom offset storage.
+ * There are several cases where this makes sense:
+ *
+ * - The first case is if the process is maintaining some kind of local state associated with that partition (like a
+ * local on-disk key-value store) and hence it should only get records for the partition it is maintaining on disk.
+ *
- Another case is if the process itself is highly available and will be restarted if it fails (perhaps using a
+ * cluster management framework like YARN, Mesos, or AWS facilities, or as part of a stream processing framework). In
+ * this case there is no need for Kafka to detect the failure and reassign the partition, rather the consuming process
+ * will be restarted on another machine.
+ *
+ *
+ * This mode is easy to specify, rather than subscribing to the topic, the consumer just subscribes to particular
+ * partitions:
+ *
*
- * {@code
- * Properties props = new Properties();
- * props.put("metadata.broker.list", "localhost:9092");
- * KafkaConsumer consumer = new KafkaConsumer(props);
- * // subscribe to some partitions of topic foo
- * TopicPartition partition0 = new TopicPartition("foo", 0);
- * TopicPartition partition1 = new TopicPartition("foo", 1);
- * TopicPartition[] partitions = new TopicPartition[2];
- * partitions[0] = partition0;
- * partitions[1] = partition1;
- * consumer.subscribe(partitions);
- * Map lastCommittedOffsets = getLastCommittedOffsetsFromCustomStore();
- * // seek to the last committed offsets to avoid duplicates
- * consumer.seek(lastCommittedOffsets);
- * // find the offsets of the latest available messages to know where to stop consumption
- * Map latestAvailableOffsets = consumer.offsetsBeforeTime(-2, Arrays.asList(partitions));
- * boolean isRunning = true;
- * Map consumedOffsets = new HashMap();
- * while(isRunning) {
- * Map> records = consumer.poll(100);
- * Map lastConsumedOffsets = process(records);
- * consumedOffsets.putAll(lastConsumedOffsets);
- * // commit offsets for partitions 0,1 for topic foo to custom store
- * commitOffsetsToCustomStore(consumedOffsets);
- * for(TopicPartition partition : partitions) {
- * if(consumedOffsets.get(partition) >= latestAvailableOffsets.get(partition))
- * isRunning = false;
- * else
- * isRunning = true;
- * }
- * }
- * commitOffsetsToCustomStore(consumedOffsets);
- * consumer.close();
- * }
+ * String topic = "foo";
+ * TopicPartition partition0 = new TopicPartition(topic, 0);
+ * TopicPartition partition1 = new TopicPartition(topic, 1);
+ * consumer.subscribe(partition0);
+ * consumer.subscribe(partition1);
*
+ *
+ * The group that the consumer specifies is still used for committing offsets, but now the set of partitions will only
+ * be changed if the consumer specifies new partitions, and no attempt at failure detection will be made.
+ *
+ * It isn't possible to mix both subscription to specific partitions (with no load balancing) and to topics (with load
+ * balancing) using the same consumer instance.
+ *
+ *
Managing Your Own Offsets
+ *
+ * The consumer application need not use Kafka's built-in offset storage, it can store offsets in a store of it's own
+ * choosing. The primary use case for this is allowing the application to store both the offset and the results of the
+ * consumption in the same system in a way that both the results and offsets are stored atomically. This is not always
+ * possible, but when it is it will make the consumption fully atomic and give "exactly once" semantics that are
+ * stronger than the default "at-least once" semantics you get with Kafka's offset commit functionality.
+ *
+ * Here are a couple of examples of this type of usage:
+ *
+ * - If the results of the consumption are being stored in a relational database, storing the offset in the database
+ * as well can allow committing both the results and offset in a single transaction. Thus either the transaction will
+ * succeed and the offset will be updated based on what was consumed or the result will not be stored and the offset
+ * won't be updated.
+ *
- If the results are being stored in a local store it may be possible to store the offset there as well. For
+ * example a search index could be built by subscribing to a particular partition and storing both the offset and the
+ * indexed data together. If this is done in a way that is atomic, it is often possible to have it be the case that even
+ * if a crash occurs that causes unsync'd data to be lost, whatever is left has the corresponding offset stored as well.
+ * This means that in this case the indexing process that comes back having lost recent updates just resumes indexing
+ * from what it has ensuring that no updates are lost.
+ *
+ *
+ * Each record comes with it's own offset, so to manage your own offset you just need to do the following:
+ *
+ * - Configure
enable.auto.commit=false
+ * - Use the offset provided with each {@link ConsumerRecord} to save your position.
+ *
- On restart restore the position of the consumer using {@link #seek(TopicPartition, long)}.
+ *
+ *
+ * This type of usage is simplest when the partition assignment is also done manually (this would be likely in the
+ * search index use case described above). If the partition assignment is done automatically special care will also be
+ * needed to handle the case where partition assignments change. This can be handled using a special callback specified
+ * using rebalance.callback.class
, which specifies an implementation of the interface
+ * {@link ConsumerRebalanceCallback}. When partitions are taken from a consumer the consumer will want to commit its
+ * offset for those partitions by implementing
+ * {@link ConsumerRebalanceCallback#onPartitionsRevoked(Consumer, Collection)}. When partitions are assigned to a
+ * consumer, the consumer will want to look up the offset for those new partitions an correctly initialize the consumer
+ * to that position by implementing {@link ConsumerRebalanceCallback#onPartitionsAssigned(Consumer, Collection)}.
+ *
+ * Another common use for {@link ConsumerRebalanceCallback} is to flush any caches the application maintains for
+ * partitions that are moved elsewhere.
+ *
+ *
Controlling The Consumer's Position
+ *
+ * In most use cases the consumer will simply consume records from beginning to end, periodically committing it's
+ * position (either automatically or manually). However Kafka allows the consumer to manually control it's position,
+ * moving forward or backwards in a partition at will. This means a consumer can re-consume older records, or skip to
+ * the most recent records without actually consuming the intermediate records.
+ *
+ * There are several instances where manually controlling the consumer's position can be useful.
+ *
+ * One case is for time-sensitive record processing it may make sense for a consumer that falls far enough behind to not
+ * attempt to catch up processing all records, but rather just skip to the most recent records.
+ *
+ * Another use case is for a system that maintains local state as described in the previous section. In such a system
+ * the consumer will want to initialize it's position on start-up to whatever is contained in the local store. Likewise
+ * if the local state is destroyed (say because the disk is lost) the state may be recreated on a new machine by
+ * reconsuming all the data and recreating the state (assuming that Kafka is retaining sufficient history).
+ *
+ * Kafka allows specifying the position using {@link #seek(TopicPartition, long)} to specify the new position. Special
+ * methods for seeking to the earliest and latest offset the server maintains are also available (
+ * {@link #seekToBeginning(TopicPartition...)} and {@link #seekToEnd(TopicPartition...)} respectively).
+ *
+ *
Multithreaded Processing
+ *
+ * The Kafka consumer is threadsafe but coarsely synchronized. All network I/O happens in the thread of the application
+ * making the call. We have intentionally avoided implementing a particular threading model for processing.
+ *
+ * This leaves several options for implementing multi-threaded processing of records.
+ *
+ *
1. One Consumer Per Thread
+ *
+ * A simple option is to give each thread it's own consumer instance. Here are the pros and cons of this approach:
+ *
+ * - PRO: It is the easiest to implement
+ *
- PRO: It is often the fastest as no inter-thread co-ordination is needed
+ *
- PRO: It makes in-order processing on a per-partition basis very easy to implement (each thread just
+ * processes messages in the order it receives them).
+ *
- CON: More consumers means more TCP connections to the cluster (one per thread). In general Kafka handles
+ * connections very efficiently so this is generally a small cost.
+ *
- CON: Multiple consumers means more requests being sent to the server and slightly less batching of data
+ * which can cause some drop in I/O throughput.
+ *
- CON: The number of total threads across all processes will be limited by the total number of partitions.
+ *
+ *
+ * 2. Decouple Consumption and Processing
+ *
+ * Another alternative is to have one or more consumer threads that do all data consumption and hands off
+ * {@link ConsumerRecords} instances to a blocking queue consumed by a pool of processor threads that actually handle
+ * the record processing.
+ *
+ * This option likewise has pros and cons:
+ *
+ * - PRO: This option allows independently scaling the number of consumers and processors. This makes it
+ * possible to have a single consumer that feeds many processor threads, avoiding any limitation on partitions.
+ *
- CON: Guaranteeing order across the processors requires particular care as the threads will execute
+ * independently an earlier chunk of data may actually be processed after a later chunk of data just due to the luck of
+ * thread execution timing. For processing that has no ordering requirements this is not a problem.
+ *
- CON: Manually committing the position becomes harder as it requires that all threads co-ordinate to ensure
+ * that processing is complete for that partition.
+ *
+ *
+ * There are many possible variations on this approach. For example each processor thread can have it's own queue, and
+ * the consumer threads can hash into these queues using the TopicPartition to ensure in-order consumption and simplify
+ * commit.
+ *
*/
-public class KafkaConsumer implements Consumer {
+public class KafkaConsumer implements Consumer {
private static final Logger log = LoggerFactory.getLogger(KafkaConsumer.class);
+ private static final long EARLIEST_OFFSET_TIMESTAMP = -2L;
+ private static final long LATEST_OFFSET_TIMESTAMP = -1L;
+ private static final AtomicInteger consumerAutoId = new AtomicInteger(1);
- private final long metadataFetchTimeoutMs;
- private final long totalMemorySize;
- private final Metrics metrics;
- private final Set subscribedTopics;
- private final Set subscribedPartitions;
+ private final Time time;
+ private final ConsumerMetrics metrics;
private final Deserializer keyDeserializer;
private final Deserializer valueDeserializer;
+ private final SubscriptionState subscriptions;
+ private final Metadata metadata;
+ private final Heartbeat heartbeat;
+ private final NetworkClient client;
+ private final int maxWaitMs;
+ private final int minBytes;
+ private final int fetchSize;
+ private final boolean autoCommit;
+ private final long autoCommitIntervalMs;
+ private final String group;
+ private final long sessionTimeoutMs;
+ private final long retryBackoffMs;
+ private final String partitionAssignmentStrategy;
+ private final AutoOffsetResetStrategy offsetResetStrategy;
+ private final ConsumerRebalanceCallback rebalanceCallback;
+ private final List> records;
+ private final boolean checkCrcs;
+ private long lastCommitAttemptMs;
+ private String consumerId;
+ private Node consumerCoordinator;
+ private boolean closed = false;
+ private int generation;
/**
* A consumer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings
- * are documented here. Values can be
- * either strings or Objects of the appropriate type (for example a numeric configuration would accept either the
+ * are documented here. Values can be
+ * either strings or objects of the appropriate type (for example a numeric configuration would accept either the
* string "42" or the integer 42).
*
* Valid configuration strings are documented at {@link ConsumerConfig}
- * @param configs The consumer configs
+ *
+ * @param configs The consumer configs
*/
public KafkaConsumer(Map configs) {
- this(configs, null);
+ this(configs, null, null, null);
}
/**
- * A consumer is instantiated by providing a set of key-value pairs as configuration and a {@link ConsumerRebalanceCallback}
- * implementation
+ * A consumer is instantiated by providing a set of key-value pairs as configuration, a
+ * {@link ConsumerRebalanceCallback} implementation, a key and a value {@link Deserializer}.
*
* Valid configuration strings are documented at {@link ConsumerConfig}
- * @param configs The consumer configs
- * @param callback A callback interface that the user can implement to manage customized offsets on the start and end of
- * every rebalance operation.
+ *
+ * @param configs The consumer configs
+ * @param callback A callback interface that the user can implement to manage customized offsets on the start and
+ * end of every rebalance operation.
+ * @param keyDeserializer The deserializer for key that implements {@link Deserializer}. The configure() method
+ * won't be called in the consumer when the deserializer is passed in directly.
+ * @param valueDeserializer The deserializer for value that implements {@link Deserializer}. The configure() method
+ * won't be called in the consumer when the deserializer is passed in directly.
*/
- public KafkaConsumer(Map configs, ConsumerRebalanceCallback callback) {
- this(configs, callback, null, null);
- }
-
- /**
- * A consumer is instantiated by providing a set of key-value pairs as configuration, a {@link ConsumerRebalanceCallback}
- * implementation, a key and a value {@link Deserializer}.
- *
- * Valid configuration strings are documented at {@link ConsumerConfig}
- * @param configs The consumer configs
- * @param callback A callback interface that the user can implement to manage customized offsets on the start and end of
- * every rebalance operation.
- * @param keyDeserializer The deserializer for key that implements {@link Deserializer}. The configure() method won't
- * be called in the consumer when the deserializer is passed in directly.
- * @param valueDeserializer The deserializer for value that implements {@link Deserializer}. The configure() method
- * won't be called in the consumer when the deserializer is passed in directly.
- */
- public KafkaConsumer(Map configs, ConsumerRebalanceCallback callback, Deserializer keyDeserializer, Deserializer valueDeserializer) {
+ public KafkaConsumer(Map configs,
+ ConsumerRebalanceCallback callback,
+ Deserializer keyDeserializer,
+ Deserializer valueDeserializer) {
this(new ConsumerConfig(addDeserializerToConfig(configs, keyDeserializer, valueDeserializer)),
- callback, keyDeserializer, valueDeserializer);
+ callback,
+ keyDeserializer,
+ valueDeserializer);
}
private static Map addDeserializerToConfig(Map configs,
- Deserializer> keyDeserializer, Deserializer> valueDeserializer) {
+ Deserializer> keyDeserializer,
+ Deserializer> valueDeserializer) {
Map newConfigs = new HashMap();
newConfigs.putAll(configs);
if (keyDeserializer != null)
@@ -393,24 +460,13 @@ private static Map addDeserializerToConfig(Map c
}
/**
- * A consumer is instantiated by providing a {@link java.util.Properties} object as configuration.
- * Valid configuration strings are documented at {@link ConsumerConfig}
+ * A consumer is instantiated by providing a {@link java.util.Properties} object as configuration. Valid
+ * configuration strings are documented at {@link ConsumerConfig} A consumer is instantiated by providing a
+ * {@link java.util.Properties} object as configuration. Valid configuration strings are documented at
+ * {@link ConsumerConfig}
*/
public KafkaConsumer(Properties properties) {
- this(properties, null);
- }
-
- /**
- * A consumer is instantiated by providing a {@link java.util.Properties} object as configuration and a
- * {@link ConsumerRebalanceCallback} implementation.
- *
- * Valid configuration strings are documented at {@link ConsumerConfig}
- * @param properties The consumer configuration properties
- * @param callback A callback interface that the user can implement to manage customized offsets on the start and end of
- * every rebalance operation.
- */
- public KafkaConsumer(Properties properties, ConsumerRebalanceCallback callback) {
- this(properties, callback, null, null);
+ this(properties, null, null, null);
}
/**
@@ -418,21 +474,28 @@ public KafkaConsumer(Properties properties, ConsumerRebalanceCallback callback)
* {@link ConsumerRebalanceCallback} implementation, a key and a value {@link Deserializer}.
*
* Valid configuration strings are documented at {@link ConsumerConfig}
+ *
* @param properties The consumer configuration properties
- * @param callback A callback interface that the user can implement to manage customized offsets on the start and end of
- * every rebalance operation.
- * @param keyDeserializer The deserializer for key that implements {@link Deserializer}. The configure() method won't
- * be called in the consumer when the deserializer is passed in directly.
- * @param valueDeserializer The deserializer for value that implements {@link Deserializer}. The configure() method
- * won't be called in the consumer when the deserializer is passed in directly.
- */
- public KafkaConsumer(Properties properties, ConsumerRebalanceCallback callback, Deserializer keyDeserializer, Deserializer valueDeserializer) {
+ * @param callback A callback interface that the user can implement to manage customized offsets on the start and
+ * end of every rebalance operation.
+ * @param keyDeserializer The deserializer for key that implements {@link Deserializer}. The configure() method
+ * won't be called in the consumer when the deserializer is passed in directly.
+ * @param valueDeserializer The deserializer for value that implements {@link Deserializer}. The configure() method
+ * won't be called in the consumer when the deserializer is passed in directly.
+ */
+ public KafkaConsumer(Properties properties,
+ ConsumerRebalanceCallback callback,
+ Deserializer keyDeserializer,
+ Deserializer valueDeserializer) {
this(new ConsumerConfig(addDeserializerToConfig(properties, keyDeserializer, valueDeserializer)),
- callback, keyDeserializer, valueDeserializer);
+ callback,
+ keyDeserializer,
+ valueDeserializer);
}
private static Properties addDeserializerToConfig(Properties properties,
- Deserializer> keyDeserializer, Deserializer> valueDeserializer) {
+ Deserializer> keyDeserializer,
+ Deserializer> valueDeserializer) {
Properties newProperties = new Properties();
newProperties.putAll(properties);
if (keyDeserializer != null)
@@ -442,17 +505,12 @@ private static Properties addDeserializerToConfig(Properties properties,
return newProperties;
}
- private KafkaConsumer(ConsumerConfig config, ConsumerRebalanceCallback callback, Deserializer keyDeserializer, Deserializer valueDeserializer) {
- log.trace("Starting the Kafka consumer");
- subscribedTopics = new HashSet();
- subscribedPartitions = new HashSet();
- this.metrics = new Metrics(new MetricConfig(),
- Collections.singletonList((MetricsReporter) new JmxReporter("kafka.consumer.")),
- new SystemTime());
- this.metadataFetchTimeoutMs = config.getLong(ConsumerConfig.METADATA_FETCH_TIMEOUT_CONFIG);
- this.totalMemorySize = config.getLong(ConsumerConfig.TOTAL_BUFFER_MEMORY_CONFIG);
- List addresses = ClientUtils.parseAndValidateAddresses(config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
-
+ @SuppressWarnings("unchecked")
+ private KafkaConsumer(ConsumerConfig config,
+ ConsumerRebalanceCallback callback,
+ Deserializer keyDeserializer,
+ Deserializer valueDeserializer) {
+ log.debug("Starting the Kafka consumer");
if (keyDeserializer == null)
this.keyDeserializer = config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
Deserializer.class);
@@ -463,181 +521,1072 @@ private KafkaConsumer(ConsumerConfig config, ConsumerRebalanceCallback callback,
Deserializer.class);
else
this.valueDeserializer = valueDeserializer;
+ if (callback == null)
+ this.rebalanceCallback = config.getConfiguredInstance(ConsumerConfig.CONSUMER_REBALANCE_CALLBACK_CLASS_CONFIG,
+ ConsumerRebalanceCallback.class);
+ else
+ this.rebalanceCallback = callback;
+ this.time = new SystemTime();
+ this.maxWaitMs = config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG);
+ this.minBytes = config.getInt(ConsumerConfig.FETCH_MIN_BYTES_CONFIG);
+ this.fetchSize = config.getInt(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG);
+ this.group = config.getString(ConsumerConfig.GROUP_ID_CONFIG);
+ this.records = new LinkedList>();
+ this.sessionTimeoutMs = config.getLong(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG);
+ this.heartbeat = new Heartbeat(this.sessionTimeoutMs, time.milliseconds());
+ this.partitionAssignmentStrategy = config.getString(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG);
+ this.offsetResetStrategy = AutoOffsetResetStrategy.valueOf(config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)
+ .toUpperCase());
+ this.checkCrcs = config.getBoolean(ConsumerConfig.CHECK_CRCS_CONFIG);
+ this.autoCommit = config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
+ this.autoCommitIntervalMs = config.getLong(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG);
+ this.lastCommitAttemptMs = time.milliseconds();
+
+ MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG))
+ .timeWindow(config.getLong(ConsumerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG),
+ TimeUnit.MILLISECONDS);
+ String clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG);
+ String jmxPrefix = "kafka.consumer";
+ if(clientId .length() <= 0)
+ clientId = "consumer-" + consumerAutoId.getAndIncrement();
+ List reporters = config.getConfiguredInstances(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG,
+ MetricsReporter.class);
+ reporters.add(new JmxReporter(jmxPrefix));
+ Metrics metrics = new Metrics(metricConfig, reporters, time);
+ this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+ this.metadata = new Metadata(retryBackoffMs, config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG));
+ List addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
+ this.metadata.update(Cluster.bootstrap(addresses), 0);
+
+ String metricsGroup = "consumer";
+ Map metricsTags = new LinkedHashMap();
+ metricsTags.put("client-id", clientId);
+ long reconnectBackoffMs = config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG);
+ int sendBuffer = config.getInt(ConsumerConfig.SEND_BUFFER_CONFIG);
+ int receiveBuffer = config.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG);
+ this.client = new NetworkClient(new Selector(metrics, time, metricsGroup, metricsTags),
+ this.metadata,
+ clientId,
+ 100,
+ reconnectBackoffMs,
+ sendBuffer,
+ receiveBuffer);
+ this.subscriptions = new SubscriptionState();
+ this.metrics = new ConsumerMetrics(metrics, metricsGroup, metricsTags);
config.logUnused();
- log.debug("Kafka consumer started");
+
+ this.consumerCoordinator = null;
+ this.consumerId = "";
+ this.generation = -1;
+ log.debug("Kafka consumer created");
+ }
+
+ /**
+ * The set of partitions currently assigned to this consumer. If subscription happened by directly subscribing to
+ * partitions using {@link #subscribe(TopicPartition...)} then this will simply return the list of partitions that
+ * were subscribed to. If subscription was done by specifying only the topic using {@link #subscribe(String...)}
+ * then this will give the set of topics currently assigned to the consumer (which may be none if the assignment
+ * hasn't happened yet, or the partitions are in the process of getting reassigned).
+ */
+ public synchronized Set subscriptions() {
+ return Collections.unmodifiableSet(this.subscriptions.assignedPartitions());
}
/**
* Incrementally subscribes to the given list of topics and uses the consumer's group management functionality
*
- * As part of group management, the consumer will keep track of the list of consumers that belong to a particular group and
- * will trigger a rebalance operation if one of the following events trigger -
+ * As part of group management, the consumer will keep track of the list of consumers that belong to a particular
+ * group and will trigger a rebalance operation if one of the following events trigger -
*
- * - Number of partitions change for any of the subscribed list of topics
- *
- Topic is created or deleted
- *
- An existing member of the consumer group dies
- *
- A new member is added to an existing consumer group via the join API
- *
+ * Number of partitions change for any of the subscribed list of topics
+ * Topic is created or deleted
+ * An existing member of the consumer group dies
+ * A new member is added to an existing consumer group via the join API
+ *
+ *
* @param topics A variable list of topics that the consumer wants to subscribe to
*/
@Override
- public void subscribe(String... topics) {
- if(subscribedPartitions.size() > 0)
- throw new IllegalStateException("Subcription to topics and partitions is mutually exclusive");
- for(String topic:topics)
- subscribedTopics.add(topic);
- // TODO: trigger a rebalance operation
+ public synchronized void subscribe(String... topics) {
+ ensureNotClosed();
+ log.debug("Subscribed to topic(s): ", Utils.join(topics, ", "));
+ for (String topic : topics)
+ this.subscriptions.subscribe(topic);
+ metadata.addTopics(topics);
}
/**
- * Incrementally subscribes to a specific topic partition and does not use the consumer's group management functionality. As such,
- * there will be no rebalance operation triggered when group membership or cluster and topic metadata change.
+ * Incrementally subscribes to a specific topic partition and does not use the consumer's group management
+ * functionality. As such, there will be no rebalance operation triggered when group membership or cluster and topic
+ * metadata change.
*
+ *
* @param partitions Partitions to incrementally subscribe to
*/
@Override
- public void subscribe(TopicPartition... partitions) {
- if(subscribedTopics.size() > 0)
- throw new IllegalStateException("Subcription to topics and partitions is mutually exclusive");
- for(TopicPartition partition:partitions)
- subscribedPartitions.add(partition);
+ public synchronized void subscribe(TopicPartition... partitions) {
+ ensureNotClosed();
+ log.debug("Subscribed to partitions(s): ", Utils.join(partitions, ", "));
+ for (TopicPartition tp : partitions) {
+ this.subscriptions.subscribe(tp);
+ metadata.addTopics(tp.topic());
+ }
}
/**
- * Unsubscribe from the specific topics. This will trigger a rebalance operation and messages for this topic will not be returned
- * from the next {@link #poll(long) poll()} onwards
+ * Unsubscribe from the specific topics. This will trigger a rebalance operation and records for this topic will not
+ * be returned from the next {@link #poll(long) poll()} onwards
+ *
* @param topics Topics to unsubscribe from
*/
- public void unsubscribe(String... topics) {
+ public synchronized void unsubscribe(String... topics) {
+ ensureNotClosed();
+ log.debug("Unsubscribed from topic(s): ", Utils.join(topics, ", "));
// throw an exception if the topic was never subscribed to
- for(String topic:topics) {
- if(!subscribedTopics.contains(topic))
- throw new IllegalStateException("Topic " + topic + " was never subscribed to. subscribe(" + topic + ") should be called prior" +
- " to unsubscribe(" + topic + ")");
- subscribedTopics.remove(topic);
- }
- // TODO trigger a rebalance operation
+ for (String topic : topics)
+ this.subscriptions.unsubscribe(topic);
}
/**
- * Unsubscribe from the specific topic partitions. Messages for these partitions will not be returned from the next
+ * Unsubscribe from the specific topic partitions. records for these partitions will not be returned from the next
* {@link #poll(long) poll()} onwards
+ *
* @param partitions Partitions to unsubscribe from
*/
- public void unsubscribe(TopicPartition... partitions) {
+ public synchronized void unsubscribe(TopicPartition... partitions) {
+ ensureNotClosed();
+ log.debug("Unsubscribed from partitions(s): ", Utils.join(partitions, ", "));
// throw an exception if the partition was never subscribed to
- for(TopicPartition partition:partitions) {
- if(!subscribedPartitions.contains(partition))
- throw new IllegalStateException("Partition " + partition + " was never subscribed to. subscribe(new TopicPartition(" +
- partition.topic() + "," + partition.partition() + ") should be called prior" +
- " to unsubscribe(new TopicPartition(" + partition.topic() + "," + partition.partition() + ")");
- subscribedPartitions.remove(partition);
- }
- // trigger a rebalance operation
+ for (TopicPartition partition : partitions)
+ this.subscriptions.unsubscribe(partition);
}
-
+
/**
- * Fetches data for the topics or partitions specified using one of the subscribe APIs. It is an error to not have subscribed to
- * any topics or partitions before polling for data.
- *
- * The offset used for fetching the data is governed by whether or not {@link #seek(Map) seek(offsets)}
- * is used. If {@link #seek(Map) seek(offsets)} is used, it will use the specified offsets on startup and
- * on every rebalance, to consume data from that offset sequentially on every poll. If not, it will use the last checkpointed offset
- * using {@link #commit(Map, boolean) commit(offsets, sync)}
- * for the subscribed list of partitions.
- * @param timeout The time, in milliseconds, spent waiting in poll if data is not available. If 0, waits indefinitely. Must not be negative
+ * Fetches data for the topics or partitions specified using one of the subscribe APIs. It is an error to not have
+ * subscribed to any topics or partitions before polling for data.
+ *
+ * The offset used for fetching the data is governed by whether or not {@link #seek(TopicPartition, long)} is used.
+ * If {@link #seek(TopicPartition, long)} is used, it will use the specified offsets on startup and on every
+ * rebalance, to consume data from that offset sequentially on every poll. If not, it will use the last checkpointed
+ * offset using {@link #commit(Map, CommitType) commit(offsets, sync)} for the subscribed list of partitions.
+ *
+ * @param timeout The time, in milliseconds, spent waiting in poll if data is not available. If 0, waits
+ * indefinitely. Must not be negative
* @return map of topic to records since the last fetch for the subscribed list of topics and partitions
+ *
+ * @throws NoOffsetForPartitionException If there is no stored offset for a subscribed partition and no automatic
+ * offset reset policy has been configured.
*/
@Override
- public Map> poll(long timeout) {
- // TODO Auto-generated method stub
- return null;
+ public synchronized ConsumerRecords poll(long timeout) {
+ ensureNotClosed();
+ long now = time.milliseconds();
+
+ if (subscriptions.partitionsAutoAssigned()) {
+ // get partition assignment if needed
+ if (subscriptions.needsPartitionAssignment()) {
+ joinGroup(now);
+ } else if (!heartbeat.isAlive(now)) {
+ log.error("Failed heartbeat check.");
+ coordinatorDead();
+ } else if (heartbeat.shouldHeartbeat(now)) {
+ initiateHeartbeat(now);
+ }
+ }
+
+ // fetch positions if we have partitions we're subscribed to that we
+ // don't know the offset for
+ if (!subscriptions.hasAllFetchPositions())
+ fetchMissingPositionsOrResetThem(this.subscriptions.missingFetchPositions(), now);
+
+ // maybe autocommit position
+ if (shouldAutoCommit(now))
+ commit(CommitType.ASYNC);
+
+ /*
+ * initiate any needed fetches, then block for the timeout the user specified
+ */
+ Cluster cluster = this.metadata.fetch();
+ reinstateFetches(cluster, now);
+ client.poll(timeout, now);
+
+ /*
+ * initiate a fetch request for any nodes that we just got a response from without blocking
+ */
+ reinstateFetches(cluster, now);
+ client.poll(0, now);
+
+ return new ConsumerRecords(consumeBufferedRecords());
}
/**
* Commits the specified offsets for the specified list of topics and partitions to Kafka.
*
- * This commits offsets only to Kafka. The offsets committed using this API will be used on the first fetch after every rebalance
- * and also on startup. As such, if you need to store offsets in anything other than Kafka, this API should not be used.
- * @param offsets The list of offsets per partition that should be committed to Kafka.
- * @param sync If true, commit will block until the consumer receives an acknowledgment
- * @return An {@link OffsetMetadata} object that contains the partition, offset and a corresponding error code. Returns null
- * if the sync flag is set to false.
+ * This commits offsets to Kafka. The offsets committed using this API will be used on the first fetch after every
+ * rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API
+ * should not be used.
+ *
+ * A non-blocking commit will attempt to commit offsets asychronously. No error will be thrown if the commit fails.
+ * A blocking commit will wait for a response acknowledging the commit. In the event of an error it will retry until
+ * the commit succeeds.
+ *
+ * @param offsets The list of offsets per partition that should be committed to Kafka.
+ * @param commitType Control whether the commit is blocking
*/
@Override
- public OffsetMetadata commit(Map offsets, boolean sync) {
- throw new UnsupportedOperationException();
+ public synchronized void commit(final Map offsets, CommitType commitType) {
+ ensureNotClosed();
+ log.debug("Committing offsets ({}): {} ", commitType.toString().toLowerCase(), offsets);
+ long now = time.milliseconds();
+ this.lastCommitAttemptMs = now;
+ if (!offsets.isEmpty()) {
+ Map offsetData = new HashMap(offsets.size());
+ for (Map.Entry entry : offsets.entrySet())
+ offsetData.put(entry.getKey(), new OffsetCommitRequest.PartitionData(entry.getValue(), now, ""));
+ OffsetCommitRequest req = new OffsetCommitRequest(this.group, this.generation, this.consumerId, offsetData);
+
+ RequestCompletionHandler handler = new RequestCompletionHandler() {
+ public void onComplete(ClientResponse resp) {
+ if (resp.wasDisconnected()) {
+ handleDisconnect(resp, time.milliseconds());
+ } else {
+ OffsetCommitResponse response = new OffsetCommitResponse(resp.responseBody());
+ for (Map.Entry entry : response.responseData().entrySet()) {
+ TopicPartition tp = entry.getKey();
+ short errorCode = entry.getValue();
+ long offset = offsets.get(tp);
+ if (errorCode == Errors.NONE.code()) {
+ log.debug("Committed offset {} for partition {}", offset, tp);
+ subscriptions.committed(tp, offset);
+ } else if (errorCode == Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code()
+ || errorCode == Errors.NOT_COORDINATOR_FOR_CONSUMER.code()) {
+ coordinatorDead();
+ } else {
+ log.error("Error committing partition {} at offset {}: {}",
+ tp,
+ offset,
+ Errors.forCode(errorCode).exception().getMessage());
+ }
+ }
+ }
+ metrics.commitLatency.record(resp.requestLatencyMs());
+ }
+ };
+
+ if (commitType == CommitType.ASYNC) {
+ this.initiateCoordinatorRequest(ApiKeys.OFFSET_COMMIT, req.toStruct(), handler, now);
+ return;
+ } else {
+ boolean done;
+ do {
+ ClientResponse response = blockingCoordinatorRequest(ApiKeys.OFFSET_COMMIT,
+ req.toStruct(),
+ handler,
+ now);
+
+ // check for errors
+ done = true;
+ OffsetCommitResponse commitResponse = new OffsetCommitResponse(response.responseBody());
+ for (short errorCode : commitResponse.responseData().values()) {
+ if (errorCode != Errors.NONE.code())
+ done = false;
+ }
+ if (!done) {
+ log.debug("Error in offset commit, backing off for {} ms before retrying again.",
+ this.retryBackoffMs);
+ Utils.sleep(this.retryBackoffMs);
+ }
+ } while (!done);
+ }
+ }
}
/**
- * Commits offsets returned on the last {@link #poll(long) poll()} for the subscribed list of topics and
- * partitions.
+ * Commits offsets returned on the last {@link #poll(long) poll()} for the subscribed list of topics and partitions.
*
- * This commits offsets only to Kafka. The offsets committed using this API will be used on the first fetch after every rebalance
- * and also on startup. As such, if you need to store offsets in anything other than Kafka, this API should not be used.
- * @param sync If true, commit will block until the consumer receives an acknowledgment
- * @return An {@link OffsetMetadata} object that contains the partition, offset and a corresponding error code. Returns null
- * if the sync flag is set to false.
+ * This commits offsets only to Kafka. The offsets committed using this API will be used on the first fetch after
+ * every rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API
+ * should not be used.
+ *
+ * @param commitType Whether or not the commit should block until it is acknowledged.
*/
@Override
- public OffsetMetadata commit(boolean sync) {
- throw new UnsupportedOperationException();
+ public synchronized void commit(CommitType commitType) {
+ ensureNotClosed();
+ commit(this.subscriptions.allConsumed(), commitType);
}
/**
- * Overrides the fetch offsets that the consumer will use on the next {@link #poll(long) poll(timeout)}. If this API is invoked
- * for the same partition more than once, the latest offset will be used on the next poll(). Note that you may lose data if this API is
- * arbitrarily used in the middle of consumption, to reset the fetch offsets
+ * Overrides the fetch offsets that the consumer will use on the next {@link #poll(long) poll(timeout)}. If this API
+ * is invoked for the same partition more than once, the latest offset will be used on the next poll(). Note that
+ * you may lose data if this API is arbitrarily used in the middle of consumption, to reset the fetch offsets
*/
@Override
- public void seek(Map offsets) {
+ public synchronized void seek(TopicPartition partition, long offset) {
+ ensureNotClosed();
+ log.debug("Seeking to offset {} for partition {}", offset, partition);
+ this.subscriptions.seek(partition, offset);
}
/**
- * Returns the fetch position of the next message for the specified topic partition to be used on the next {@link #poll(long) poll()}
- * @param partitions Partitions for which the fetch position will be returned
- * @return The position from which data will be fetched for the specified partition on the next {@link #poll(long) poll()}
+ * Seek to the first offset for each of the given partitions
*/
- public Map position(Collection partitions) {
- return null;
+ public synchronized void seekToBeginning(TopicPartition... partitions) {
+ ensureNotClosed();
+ Collection parts = partitions.length == 0 ? this.subscriptions.assignedPartitions()
+ : Arrays.asList(partitions);
+ for (TopicPartition tp : parts) {
+ // TODO: list offset call could be optimized by grouping by node
+ seek(tp, listOffset(tp, EARLIEST_OFFSET_TIMESTAMP));
+ }
}
/**
- * Fetches the last committed offsets of partitions that the consumer currently consumes. This API is only relevant if Kafka based offset
- * storage is used. This API can be used in conjunction with {@link #seek(Map) seek(offsets)} to rewind consumption of data.
- * @param partitions The list of partitions to return the last committed offset for
- * @return The list of offsets committed on the last {@link #commit(boolean) commit(sync)}
+ * Seek to the last offset for each of the given partitions
*/
- @Override
- public Map committed(Collection partitions) {
- // TODO Auto-generated method stub
- throw new UnsupportedOperationException();
+ public synchronized void seekToEnd(TopicPartition... partitions) {
+ ensureNotClosed();
+ Collection parts = partitions.length == 0 ? this.subscriptions.assignedPartitions()
+ : Arrays.asList(partitions);
+ for (TopicPartition tp : parts) {
+ // TODO: list offset call could be optimized by grouping by node
+ seek(tp, listOffset(tp, LATEST_OFFSET_TIMESTAMP));
+ }
}
/**
- * Fetches offsets before a certain timestamp. Note that the offsets returned are approximately computed and do not correspond to the exact
- * message at the given timestamp. As such, if the consumer is rewound to offsets returned by this API, there may be duplicate messages
- * returned by the consumer.
- * @param partitions The list of partitions for which the offsets are returned
- * @param timestamp The unix timestamp. Value -1 indicates earliest available timestamp. Value -2 indicates latest available timestamp.
- * @return The offsets per partition before the specified timestamp.
- */
- public Map offsetsBeforeTime(long timestamp, Collection partitions) {
- return null;
+ * Returns the offset of the next record that will be fetched (if a record with that offset exists).
+ *
+ * @param partition The partition to get the position for
+ * @return The offset
+ * @throws NoOffsetForPartitionException If a position hasn't been set for a given partition, and no reset policy is
+ * available.
+ */
+ public synchronized long position(TopicPartition partition) {
+ ensureNotClosed();
+ if (!this.subscriptions.assignedPartitions().contains(partition))
+ throw new IllegalArgumentException("You can only check the position for partitions assigned to this consumer.");
+ Long offset = this.subscriptions.consumed(partition);
+ if (offset == null) {
+ fetchMissingPositionsOrResetThem(Collections.singleton(partition), time.milliseconds());
+ return this.subscriptions.consumed(partition);
+ } else {
+ return offset;
+ }
}
+ /**
+ * Fetches the last committed offset for the given partition (whether the commit happened by this process or
+ * another). This offset will be used as the position for the consumer in the event of a failure.
+ *
+ * This call may block to do a remote call if the partition in question isn't assigned to this consumer or if the
+ * consumer hasn't yet initialized it's cache of committed offsets.
+ *
+ * @param partition The partition to check
+ * @return The last committed offset or null if no offset has been committed
+ * @throws NoOffsetForPartitionException If no offset has ever been committed by any process for the given
+ * partition.
+ */
+ @Override
+ public synchronized long committed(TopicPartition partition) {
+ ensureNotClosed();
+ Set partitionsToFetch;
+ if (subscriptions.assignedPartitions().contains(partition)) {
+ Long committed = this.subscriptions.committed(partition);
+ if (committed != null)
+ return committed;
+ partitionsToFetch = subscriptions.assignedPartitions();
+ } else {
+ partitionsToFetch = Collections.singleton(partition);
+ }
+ this.refreshCommittedOffsets(time.milliseconds(), partitionsToFetch);
+ Long committed = this.subscriptions.committed(partition);
+ if (committed == null)
+ throw new NoOffsetForPartitionException("No offset has been committed for partition " + partition);
+ return committed;
+ }
+
+ /**
+ * Get the metrics kept by the consumer
+ */
@Override
public Map metrics() {
- return Collections.unmodifiableMap(this.metrics.metrics());
+ return Collections.unmodifiableMap(this.metrics.metrics.metrics());
}
+ /**
+ * Get metadata about the partitions for a given topic. This method will issue a remote call to the server if it
+ * does not already have any metadata about the given topic.
+ *
+ * @param topic The topic to get partition metadata for
+ * @return The list of partitions
+ */
@Override
- public void close() {
+ public List partitionsFor(String topic) {
+ Cluster cluster = this.metadata.fetch();
+ List parts = cluster.partitionsForTopic(topic);
+ if (parts == null) {
+ metadata.add(topic);
+ awaitMetadataUpdate();
+ parts = metadata.fetch().partitionsForTopic(topic);
+ }
+ return parts;
+ }
+
+ @Override
+ public synchronized void close() {
log.trace("Closing the Kafka consumer.");
- subscribedTopics.clear();
- subscribedPartitions.clear();
- this.metrics.close();
+ this.closed = true;
+ this.metrics.metrics.close();
+ this.client.close();
log.debug("The Kafka consumer has closed.");
}
+
+ private boolean shouldAutoCommit(long now) {
+ return this.autoCommit && this.lastCommitAttemptMs <= now - this.autoCommitIntervalMs;
+ }
+
+ /*
+ * Request a metadata update and wait until it has occurred
+ */
+ private void awaitMetadataUpdate() {
+ int version = this.metadata.requestUpdate();
+ do {
+ long now = time.milliseconds();
+ this.client.poll(this.retryBackoffMs, now);
+ } while (this.metadata.version() == version);
+ }
+
+ /*
+ * Send a join group request to the controller
+ */
+ private void joinGroup(long now) {
+ log.debug("Joining group {}", group);
+
+ // execute the user's callback
+ try {
+ // TODO: Hmmm, is passing the full Consumer like this actually safe?
+ // Need to think about reentrancy...
+ this.rebalanceCallback.onPartitionsRevoked(this, this.subscriptions.assignedPartitions());
+ } catch (Exception e) {
+ log.error("User provided callback " + this.rebalanceCallback.getClass().getName()
+ + " failed on partition revocation: ", e);
+ }
+
+ // join the group
+ JoinGroupRequest jgr = new JoinGroupRequest(group,
+ (int) this.sessionTimeoutMs,
+ new ArrayList(this.subscriptions.subscribedTopics()),
+ this.consumerId,
+ this.partitionAssignmentStrategy);
+ ClientResponse resp = this.blockingCoordinatorRequest(ApiKeys.JOIN_GROUP, jgr.toStruct(), null, now);
+ // process the response
+ JoinGroupResponse response = new JoinGroupResponse(resp.responseBody());
+ log.debug("Joined group: {}", response);
+ Errors.forCode(response.errorCode()).maybeThrow();
+ this.consumerId = response.consumerId();
+ this.subscriptions.changePartitionAssignment(response.assignedPartitions());
+ this.heartbeat.receivedResponse(now);
+
+ // execute the callback
+ try {
+ // TODO: Hmmm, is passing the full Consumer like this actually safe?
+ this.rebalanceCallback.onPartitionsAssigned(this, this.subscriptions.assignedPartitions());
+ } catch (Exception e) {
+ log.error("User provided callback " + this.rebalanceCallback.getClass().getName()
+ + " failed on partition assignment: ", e);
+ }
+
+ // record re-assignment time
+ this.metrics.partitionReassignments.record(time.milliseconds() - now);
+ }
+
+ /*
+ * Empty the record buffer and update the consumed position.
+ */
+ private Map>> consumeBufferedRecords() {
+ if (this.subscriptions.needsPartitionAssignment()) {
+ return Collections.emptyMap();
+ } else {
+ Map>> drained = new HashMap>>();
+ for (PartitionRecords part : this.records) {
+ Long consumed = subscriptions.consumed(part.partition);
+ if (this.subscriptions.assignedPartitions().contains(part.partition)
+ && (consumed == null || part.fetchOffset == consumed)) {
+ List> partRecs = drained.get(part.partition);
+ if (partRecs == null) {
+ partRecs = part.records;
+ drained.put(part.partition, partRecs);
+ } else {
+ partRecs.addAll(part.records);
+ }
+ subscriptions.consumed(part.partition, part.records.get(part.records.size() - 1).offset() + 1);
+ } else {
+ // these records aren't next in line based on the last consumed position, ignore them
+ // they must be from an obsolete request
+ log.debug("Ignoring fetched records for {} at offset {}", part.partition, part.fetchOffset);
+ }
+ }
+ this.records.clear();
+ return drained;
+ }
+ }
+
+ /*
+ * Set-up a fetch request for any node that we have assigned partitions for which doesn't have one
+ */
+ private void reinstateFetches(Cluster cluster, long now) {
+ for (ClientRequest request : createFetchRequests(cluster)) {
+ Node node = cluster.nodeById(request.request().destination());
+ if (client.ready(node, now)) {
+ log.trace("Initiating fetch to node {}: {}", node.id(), request);
+ client.send(request);
+ }
+ }
+ }
+
+ /*
+ * Create fetch requests for all nodes for which we have assigned partitions that have no existing requests in
+ * flight
+ */
+ private List createFetchRequests(Cluster cluster) {
+ Map> fetchable = new HashMap>();
+ for (TopicPartition partition : subscriptions.assignedPartitions()) {
+ Node node = cluster.leaderFor(partition);
+ // if there is a leader and no in-flight requests, issue a new fetch
+ if (node != null && this.client.inFlightRequestCount(node.id()) == 0) {
+ Map fetch = fetchable.get(node);
+ if (fetch == null) {
+ fetch = new HashMap();
+ fetchable.put(node.id(), fetch);
+ }
+ long offset = this.subscriptions.fetched(partition);
+ fetch.put(partition, new FetchRequest.PartitionData(offset, this.fetchSize));
+ }
+ }
+ List requests = new ArrayList(fetchable.size());
+ for (Map.Entry> entry : fetchable.entrySet()) {
+ int nodeId = entry.getKey();
+ final FetchRequest fetch = new FetchRequest(this.maxWaitMs, minBytes, entry.getValue());
+ RequestSend send = new RequestSend(nodeId, this.client.nextRequestHeader(ApiKeys.FETCH), fetch.toStruct());
+ RequestCompletionHandler handler = new RequestCompletionHandler() {
+ public void onComplete(ClientResponse response) {
+ handleFetchResponse(response, fetch);
+ }
+ };
+ requests.add(new ClientRequest(time.milliseconds(), true, send, handler));
+ }
+ return requests;
+ }
+
+ private void handleFetchResponse(ClientResponse resp, FetchRequest request) {
+ if (resp.wasDisconnected()) {
+ handleDisconnect(resp, time.milliseconds());
+ } else {
+ int totalBytes = 0;
+ int totalCount = 0;
+ FetchResponse response = new FetchResponse(resp.responseBody());
+ for (Map.Entry entry : response.responseData().entrySet()) {
+ TopicPartition tp = entry.getKey();
+ FetchResponse.PartitionData partition = entry.getValue();
+ if (!subscriptions.assignedPartitions().contains(tp)) {
+ log.debug("Ignoring fetched data for partition {} which is no longer assigned.", tp);
+ } else if (partition.errorCode == Errors.NONE.code()) {
+ ByteBuffer buffer = partition.recordSet;
+ buffer.position(buffer.limit()); // TODO: arguably we should not have to muck with the position here
+ MemoryRecords records = MemoryRecords.readableRecords(buffer);
+ long fetchOffset = request.fetchData().get(tp).offset;
+ int bytes = 0;
+ List> parsed = new ArrayList>();
+ for (LogEntry logEntry : records) {
+ parsed.add(parseRecord(tp, logEntry));
+ bytes += logEntry.size();
+ }
+ if (parsed.size() > 0) {
+ ConsumerRecord record = parsed.get(parsed.size() - 1);
+ this.subscriptions.fetched(tp, record.offset() + 1);
+ this.metrics.lag.record(partition.highWatermark - record.offset());
+ this.records.add(new PartitionRecords(fetchOffset, tp, parsed));
+ }
+ this.metrics.recordTopicFetchMetrics(tp.topic(), bytes, parsed.size());
+ totalBytes += bytes;
+ totalCount += parsed.size();
+ } else if (partition.errorCode == Errors.NOT_LEADER_FOR_PARTITION.code()
+ || partition.errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()
+ || partition.errorCode == Errors.LEADER_NOT_AVAILABLE.code()) {
+ this.metadata.requestUpdate();
+ } else if (partition.errorCode == Errors.OFFSET_OUT_OF_RANGE.code()) {
+ // TODO: this could be optimized by grouping all out-of-range partitions
+ resetOffset(tp, time.milliseconds());
+ }
+ }
+ this.metrics.bytesFetched.record(totalBytes);
+ this.metrics.recordsFetched.record(totalCount);
+ }
+ this.metrics.fetchLatency.record(resp.requestLatencyMs());
+ }
+
+ private ConsumerRecord parseRecord(TopicPartition partition, LogEntry logEntry) {
+ if (this.checkCrcs)
+ logEntry.record().ensureValid();
+ long offset = logEntry.offset();
+ ByteBuffer keyBytes = logEntry.record().key();
+ K key = keyBytes == null ? null : this.keyDeserializer.deserialize(partition.topic(), Utils.toArray(keyBytes));
+ ByteBuffer valueBytes = logEntry.record().value();
+ V value = valueBytes == null ? null : this.valueDeserializer.deserialize(partition.topic(),
+ Utils.toArray(valueBytes));
+ return new ConsumerRecord(partition.topic(), partition.partition(), offset, key, value);
+ }
+
+ /*
+ * Begin sending a heartbeat to the controller but don't wait for the response
+ */
+ private void initiateHeartbeat(long now) {
+ ensureCoordinatorReady();
+ log.debug("Sending heartbeat to co-ordinator.");
+ HeartbeatRequest req = new HeartbeatRequest(this.group, this.generation, this.consumerId);
+ RequestSend send = new RequestSend(this.consumerCoordinator.id(),
+ this.client.nextRequestHeader(ApiKeys.HEARTBEAT),
+ req.toStruct());
+
+ RequestCompletionHandler handler = new RequestCompletionHandler() {
+ public void onComplete(ClientResponse resp) {
+ if (resp.wasDisconnected()) {
+ coordinatorDead();
+ } else {
+ HeartbeatResponse response = new HeartbeatResponse(resp.responseBody());
+ if (response.errorCode() == Errors.NONE.code()) {
+ log.debug("Received successful heartbeat response.");
+ heartbeat.receivedResponse(time.milliseconds());
+ } else if (response.errorCode() == Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code()
+ || response.errorCode() == Errors.NOT_COORDINATOR_FOR_CONSUMER.code()) {
+ coordinatorDead();
+ } else {
+ throw new KafkaException("Unexpected error in hearbeat response: "
+ + Errors.forCode(response.errorCode()).exception().getMessage());
+ }
+ }
+ metrics.heartbeatLatency.record(resp.requestLatencyMs());
+ }
+ };
+ this.client.send(new ClientRequest(now, true, send, handler));
+ this.heartbeat.sentHeartbeat(now);
+ }
+
+ private void coordinatorDead() {
+ log.info("Marking the co-ordinator dead.");
+ heartbeat.markDead();
+ if (subscriptions.partitionsAutoAssigned())
+ subscriptions.clearAssignment();
+ this.consumerCoordinator = null;
+ }
+
+ /*
+ * Initiate a request to the co-ordinator but don't wait for a response.
+ */
+ private void initiateCoordinatorRequest(ApiKeys api, Struct request, RequestCompletionHandler handler, long now) {
+ log.debug("Issuing co-ordinator request: {}: {}", api, request);
+ ensureCoordinatorReady();
+ RequestHeader header = this.client.nextRequestHeader(api);
+ RequestSend send = new RequestSend(this.consumerCoordinator.id(), header, request);
+ ClientRequest clientRequest = new ClientRequest(now, true, send, handler);
+ this.client.send(clientRequest);
+ }
+
+ /*
+ * Repeatedly attempt to send a request to the co-ordinator until a response is received (retry if we are
+ * disconnected). Note that this means any requests sent this way must be idempotent.
+ *
+ * @return The response
+ */
+ private ClientResponse blockingCoordinatorRequest(ApiKeys api,
+ Struct request,
+ RequestCompletionHandler handler,
+ long now) {
+ do {
+ initiateCoordinatorRequest(api, request, handler, now);
+ List responses = this.client.completeAll(consumerCoordinator.id(), now);
+ if (responses.size() == 0) {
+ throw new IllegalStateException("This should not happen.");
+ } else {
+ ClientResponse response = responses.get(responses.size() - 1);
+ if (response.wasDisconnected()) {
+ handleDisconnect(response, time.milliseconds());
+ Utils.sleep(this.retryBackoffMs);
+ } else {
+ return response;
+ }
+ }
+ } while (true);
+ }
+
+ /*
+ * update the current consumer co-ordinator if needed and ensure we have a ready connection to it
+ */
+ private void ensureCoordinatorReady() {
+ while (true) {
+ if (this.consumerCoordinator == null)
+ discoverCoordinator();
+
+ while (true) {
+ boolean ready = this.client.ready(this.consumerCoordinator, time.milliseconds());
+ if (ready) {
+ return;
+ } else {
+ log.debug("No connection to co-ordinator, attempting to connect.");
+ this.client.poll(this.retryBackoffMs, time.milliseconds());
+ ConnectionState state = this.client.connectionState(this.consumerCoordinator.id());
+ if (ConnectionState.DISCONNECTED.equals(state)) {
+ log.debug("Co-ordinator connection failed. Attempting to re-discover.");
+ coordinatorDead();
+ break;
+ }
+ }
+ }
+ }
+ }
+
+ private void discoverCoordinator() {
+ while (this.consumerCoordinator == null) {
+ log.debug("No consumer co-ordinator known, attempting to discover one.");
+ Node coordinator = fetchConsumerCoordinator();
+
+ if (coordinator == null) {
+ log.debug("No co-ordinator found, backing off.");
+ Utils.sleep(this.retryBackoffMs);
+ } else {
+ log.debug("Found consumer co-ordinator: " + coordinator);
+ this.consumerCoordinator = coordinator;
+ }
+ }
+ }
+
+ private Node fetchConsumerCoordinator() {
+ // find a node to ask about the co-ordinator
+ Node node = this.client.leastLoadedNode(time.milliseconds());
+ while (node == null || !this.client.ready(node, time.milliseconds())) {
+ long now = time.milliseconds();
+ this.client.poll(this.retryBackoffMs, now);
+ node = this.client.leastLoadedNode(now);
+ }
+
+ // send the metadata request and process all responses
+ long now = time.milliseconds();
+ this.client.send(createConsumerMetadataRequest(now));
+ List responses = this.client.completeAll(node.id(), now);
+ if (responses.isEmpty()) {
+ throw new IllegalStateException("This should not happen.");
+ } else {
+ ClientResponse resp = responses.get(responses.size() - 1);
+ if (!resp.wasDisconnected()) {
+ ConsumerMetadataResponse response = new ConsumerMetadataResponse(resp.responseBody());
+ if (response.errorCode() == Errors.NONE.code())
+ return new Node(Integer.MIN_VALUE, response.node().host(), response.node().port());
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Update our cache of committed positions and then set the fetch position to the committed position (if there is
+ * one) or reset it using the offset reset policy the user has configured.
+ *
+ * @throws NoOffsetForPartitionException If no offset is stored for a given partition and no offset reset policy is
+ * defined
+ */
+ private void fetchMissingPositionsOrResetThem(Set partitions, long now) {
+ // update the set of committed offsets
+ refreshCommittedOffsets(now, partitions);
+
+ // reset the fetch position to the committed poisition
+ for (TopicPartition tp : partitions) {
+ if (subscriptions.fetched(tp) == null) {
+ if (subscriptions.committed(tp) == null) {
+ resetOffset(tp, now);
+ } else {
+ log.debug("Resetting offset for partition {} to committed offset");
+ subscriptions.seek(tp, subscriptions.committed(tp));
+ }
+ }
+ }
+ }
+
+ /*
+ * Fetch the given set of partitions and update the cache of committed offsets using the result
+ */
+ private void refreshCommittedOffsets(long now, Set partitions) {
+ log.debug("Fetching committed offsets for partitions: " + Utils.join(partitions, ", "));
+ OffsetFetchRequest request = new OffsetFetchRequest(this.group, new ArrayList(partitions));
+ ClientResponse resp = this.blockingCoordinatorRequest(ApiKeys.OFFSET_FETCH, request.toStruct(), null, now);
+ OffsetFetchResponse response = new OffsetFetchResponse(resp.responseBody());
+ for (Map.Entry entry : response.responseData().entrySet()) {
+ TopicPartition tp = entry.getKey();
+ OffsetFetchResponse.PartitionData data = entry.getValue();
+ if (data.hasError()) {
+ log.debug("Error fetching offset for topic-partition {}: {}", tp, Errors.forCode(data.errorCode)
+ .exception()
+ .getMessage());
+ } else if (data.offset >= 0) {
+ // update the position with the offset (-1 seems to indicate no
+ // such offset known)
+ this.subscriptions.committed(tp, data.offset);
+ } else {
+ log.debug("No committed offset for partition " + tp);
+ }
+ }
+ }
+
+ /*
+ * Fetch a single offset before the given timestamp for the partition.
+ */
+ private long listOffset(TopicPartition tp, long ts) {
+ log.debug("Fetching offsets for partition {}.", tp);
+ Map partitions = new HashMap(1);
+ partitions.put(tp, new ListOffsetRequest.PartitionData(ts, 1));
+ while (true) {
+ long now = time.milliseconds();
+ PartitionInfo info = metadata.fetch().partition(tp);
+ if (info == null) {
+ metadata.add(tp.topic());
+ awaitMetadataUpdate();
+ } else if (info.leader() == null) {
+ awaitMetadataUpdate();
+ } else if (this.client.ready(info.leader(), now)) {
+ Node node = info.leader();
+ ListOffsetRequest request = new ListOffsetRequest(-1, partitions);
+ RequestSend send = new RequestSend(node.id(),
+ this.client.nextRequestHeader(ApiKeys.LIST_OFFSETS),
+ request.toStruct());
+ ClientRequest clientRequest = new ClientRequest(now, true, send, null);
+ this.client.send(clientRequest);
+ List responses = this.client.completeAll(node.id(), now);
+ if (responses.isEmpty())
+ throw new IllegalStateException("This should not happen.");
+ ClientResponse response = responses.get(responses.size() - 1);
+ if (response.wasDisconnected()) {
+ awaitMetadataUpdate();
+ } else {
+ ListOffsetResponse lor = new ListOffsetResponse(response.responseBody());
+ short errorCode = lor.responseData().get(tp).errorCode;
+ if (errorCode == Errors.NONE.code()) {
+ List offsets = lor.responseData().get(tp).offsets;
+ if (offsets.size() != 1)
+ throw new IllegalStateException("This should not happen.");
+ return offsets.get(0);
+ } else if (errorCode == Errors.NOT_LEADER_FOR_PARTITION.code()
+ || errorCode == Errors.LEADER_NOT_AVAILABLE.code()) {
+ log.warn("Attempt to fetch offsets for partition {} failed due to obsolete leadership information, retrying.",
+ tp);
+ awaitMetadataUpdate();
+ continue;
+ } else {
+ Errors.forCode(errorCode).maybeThrow();
+ }
+ }
+ } else {
+ client.poll(this.retryBackoffMs, now);
+ }
+ }
+ }
+
+ /*
+ * Create a consumer metadata request for the given group
+ */
+ private ClientRequest createConsumerMetadataRequest(long now) {
+ ConsumerMetadataRequest request = new ConsumerMetadataRequest(this.group);
+ Node destination = this.client.leastLoadedNode(now);
+ if (destination == null) // all nodes are blacked out
+ return null;
+ RequestSend send = new RequestSend(destination.id(),
+ this.client.nextRequestHeader(ApiKeys.CONSUMER_METADATA),
+ request.toStruct());
+ ClientRequest consumerMetadataRequest = new ClientRequest(now, true, send, null);
+ return consumerMetadataRequest;
+ }
+
+ /**
+ * Reset offsets for the given partition using the offset reset strategy
+ *
+ * @throws NoOffsetForPartitionException If no offset reset strategy is defined
+ */
+ private void resetOffset(TopicPartition partition, long now) {
+ long timestamp;
+ if (this.offsetResetStrategy == AutoOffsetResetStrategy.EARLIEST)
+ timestamp = EARLIEST_OFFSET_TIMESTAMP;
+ else if (this.offsetResetStrategy == AutoOffsetResetStrategy.LATEST)
+ timestamp = LATEST_OFFSET_TIMESTAMP;
+ else
+ throw new NoOffsetForPartitionException("No offset is set and no reset policy is defined");
+
+ log.debug("Resetting offset for partition {} to {} offset.", partition, this.offsetResetStrategy.name()
+ .toLowerCase());
+ this.subscriptions.seek(partition, listOffset(partition, timestamp));
+ }
+
+ private void handleDisconnect(ClientResponse response, long now) {
+ int correlation = response.request().request().header().correlationId();
+ log.debug("Cancelled request {} with correlation id {} due to node {} being disconnected",
+ response.request(),
+ correlation,
+ response.request().request().destination());
+ if (this.consumerCoordinator != null
+ && response.request().request().destination() == this.consumerCoordinator.id())
+ coordinatorDead();
+ }
+
+ /*
+ * Check that the consumer hasn't been closed.
+ */
+ private void ensureNotClosed() {
+ if (this.closed)
+ throw new IllegalStateException("This consumer has already been closed.");
+ }
+
+ private static class PartitionRecords {
+ public long fetchOffset;
+ public TopicPartition partition;
+ public List> records;
+
+ public PartitionRecords(long fetchOffset, TopicPartition partition, List> records) {
+ this.fetchOffset = fetchOffset;
+ this.partition = partition;
+ this.records = records;
+ }
+ }
+
+ private static enum AutoOffsetResetStrategy {
+ LATEST, EARLIEST, NONE;
+ }
+
+ private class ConsumerMetrics {
+ public final Metrics metrics;
+ public final Sensor bytesFetched;
+ public final Sensor recordsFetched;
+ public final Sensor fetchLatency;
+ public final Sensor commitLatency;
+ public final Sensor partitionReassignments;
+ public final Sensor heartbeatLatency;
+ public final Sensor lag;
+
+ public ConsumerMetrics(Metrics metrics, String metricsGroup, Map tags) {
+ this.metrics = metrics;
+
+ this.bytesFetched = metrics.sensor("bytes-fetched");
+ this.bytesFetched.add(new MetricName("fetch-size-avg",
+ metricsGroup,
+ "The average number of bytes fetched per request",
+ tags), new Avg());
+ this.bytesFetched.add(new MetricName("fetch-size-max",
+ metricsGroup,
+ "The maximum number of bytes fetched per request",
+ tags), new Max());
+ this.bytesFetched.add(new MetricName("bytes-consumed-rate",
+ metricsGroup,
+ "The average number of bytes consumed per second",
+ tags), new Rate());
+
+ this.recordsFetched = metrics.sensor("records-fetched");
+ this.recordsFetched.add(new MetricName("records-per-request-avg",
+ metricsGroup,
+ "The average number of records in each request",
+ tags), new Avg());
+ this.recordsFetched.add(new MetricName("records-consumed-rate",
+ metricsGroup,
+ "The average number of records consumed per second",
+ tags), new Rate());
+
+ this.fetchLatency = metrics.sensor("fetch-latency");
+ this.fetchLatency.add(new MetricName("fetch-latency-avg",
+ metricsGroup,
+ "The average time taken for a fetch request.",
+ tags), new Avg());
+ this.fetchLatency.add(new MetricName("fetch-latency-max",
+ metricsGroup,
+ "The max time taken for any fetch request.",
+ tags), new Max());
+ this.fetchLatency.add(new MetricName("fetch-rate",
+ metricsGroup,
+ "The number of fetch requests per second.",
+ tags), new Rate(new Count()));
+
+ this.commitLatency = metrics.sensor("commit-latency");
+ this.commitLatency.add(new MetricName("commit-latency-avg",
+ metricsGroup,
+ "The average time taken for a commit request",
+ tags), new Avg());
+ this.commitLatency.add(new MetricName("commit-latency-max",
+ metricsGroup,
+ "The max time taken for a commit request",
+ tags), new Max());
+ this.commitLatency.add(new MetricName("commit-rate",
+ metricsGroup,
+ "The number of commit calls per second",
+ tags), new Rate(new Count()));
+
+ this.partitionReassignments = metrics.sensor("reassignment-latency");
+ this.partitionReassignments.add(new MetricName("reassignment-time-avg",
+ metricsGroup,
+ "The average time taken for a partition reassignment",
+ tags), new Avg());
+ this.partitionReassignments.add(new MetricName("reassignment-time-max",
+ metricsGroup,
+ "The max time taken for a partition reassignment",
+ tags), new Avg());
+ this.partitionReassignments.add(new MetricName("reassignment-rate",
+ metricsGroup,
+ "The number of partition reassignments per second",
+ tags), new Rate(new Count()));
+
+ this.heartbeatLatency = metrics.sensor("heartbeat-latency");
+ this.heartbeatLatency.add(new MetricName("heartbeat-response-time-max",
+ metricsGroup,
+ "The max time taken to receive a response to a hearbeat request",
+ tags), new Max());
+ this.heartbeatLatency.add(new MetricName("heartbeat-rate",
+ metricsGroup,
+ "The average number of heartbeats per second",
+ tags), new Rate(new Count()));
+
+ this.lag = metrics.sensor("lag");
+ this.lag.add(new MetricName("lag-max",
+ metricsGroup,
+ "The maximum lag for any partition in this window",
+ tags), new Max());
+
+ metrics.addMetric(new MetricName("assigned-partitions",
+ metricsGroup,
+ "The number of partitions currently assigned to this consumer",
+ tags), new Measurable() {
+ public double measure(MetricConfig config, long now) {
+ return subscriptions.assignedPartitions().size();
+ }
+ });
+
+ metrics.addMetric(new MetricName("last-heartbeat-seconds-ago",
+ metricsGroup,
+ "The number of seconds since the last controller heartbeat",
+ tags), new Measurable() {
+ public double measure(MetricConfig config, long now) {
+ return TimeUnit.SECONDS.convert(now - heartbeat.lastHeartbeatSend(), TimeUnit.MILLISECONDS);
+ }
+ });
+ }
+
+ public void recordTopicFetchMetrics(String topic, int bytes, int records) {
+ // record bytes fetched
+ String name = "topic." + topic + ".bytes-fetched";
+ Sensor bytesFetched = this.metrics.getSensor(name);
+ if (bytesFetched == null)
+ bytesFetched = this.metrics.sensor(name);
+ bytesFetched.record(bytes);
+
+ // record records fetched
+ name = "topic." + topic + ".records-fetched";
+ Sensor recordsFetched = this.metrics.getSensor(name);
+ if (recordsFetched == null)
+ recordsFetched = this.metrics.sensor(name);
+ recordsFetched.record(bytes);
+ }
+ }
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index fa88ac1a8b19..f50da8257569 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -9,185 +9,174 @@
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
-*/
+ */
package org.apache.kafka.clients.consumer;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectOutputStream;
import java.util.ArrayList;
-import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+
+import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.MetricName;
/**
- * A mock of the {@link Consumer} interface you can use for testing code that uses Kafka.
- * This class is not threadsafe
+ * A mock of the {@link Consumer} interface you can use for testing code that uses Kafka. This class is not
+ * threadsafe
*
- * The consumer runs in the user thread and multiplexes I/O over TCP connections to each of the brokers it
- * needs to communicate with. Failure to close the consumer after use will leak these resources.
+ * The consumer runs in the user thread and multiplexes I/O over TCP connections to each of the brokers it needs to
+ * communicate with. Failure to close the consumer after use will leak these resources.
*/
-public class MockConsumer implements Consumer {
+public class MockConsumer implements Consumer {
+
+ private final Map> partitions;
+ private final SubscriptionState subscriptions;
+ private Map>> records;
+ private boolean closed;
- private final Set subscribedPartitions;
- private final Set subscribedTopics;
- private final Map committedOffsets;
- private final Map consumedOffsets;
-
public MockConsumer() {
- subscribedPartitions = new HashSet();
- subscribedTopics = new HashSet();
- committedOffsets = new HashMap();
- consumedOffsets = new HashMap();
+ this.subscriptions = new SubscriptionState();
+ this.partitions = new HashMap>();
+ this.records = new HashMap>>();
+ this.closed = false;
}
@Override
- public void subscribe(String... topics) {
- if(subscribedPartitions.size() > 0)
- throw new IllegalStateException("Subcription to topics and partitions is mutually exclusive");
- for(String topic : topics) {
- subscribedTopics.add(topic);
- }
+ public synchronized Set subscriptions() {
+ return this.subscriptions.assignedPartitions();
}
@Override
- public void subscribe(TopicPartition... partitions) {
- if(subscribedTopics.size() > 0)
- throw new IllegalStateException("Subcription to topics and partitions is mutually exclusive");
- for(TopicPartition partition : partitions) {
- subscribedPartitions.add(partition);
- consumedOffsets.put(partition, 0L);
- }
+ public synchronized void subscribe(String... topics) {
+ ensureNotClosed();
+ for (String topic : topics)
+ this.subscriptions.subscribe(topic);
}
- public void unsubscribe(String... topics) {
- // throw an exception if the topic was never subscribed to
- for(String topic:topics) {
- if(!subscribedTopics.contains(topic))
- throw new IllegalStateException("Topic " + topic + " was never subscribed to. subscribe(" + topic + ") should be called prior" +
- " to unsubscribe(" + topic + ")");
- subscribedTopics.remove(topic);
- }
+ @Override
+ public synchronized void subscribe(TopicPartition... partitions) {
+ ensureNotClosed();
+ for (TopicPartition partition : partitions)
+ this.subscriptions.subscribe(partition);
}
- public void unsubscribe(TopicPartition... partitions) {
- // throw an exception if the partition was never subscribed to
- for(TopicPartition partition:partitions) {
- if(!subscribedPartitions.contains(partition))
- throw new IllegalStateException("Partition " + partition + " was never subscribed to. subscribe(new TopicPartition(" +
- partition.topic() + "," + partition.partition() + ") should be called prior" +
- " to unsubscribe(new TopicPartition(" + partition.topic() + "," + partition.partition() + ")");
- subscribedPartitions.remove(partition);
- committedOffsets.remove(partition);
- consumedOffsets.remove(partition);
- }
+ public synchronized void unsubscribe(String... topics) {
+ ensureNotClosed();
+ for (String topic : topics)
+ this.subscriptions.unsubscribe(topic);
+ }
+
+ public synchronized void unsubscribe(TopicPartition... partitions) {
+ ensureNotClosed();
+ for (TopicPartition partition : partitions)
+ this.subscriptions.unsubscribe(partition);
}
@Override
- public Map> poll(long timeout) {
- // hand out one dummy record, 1 per topic
- Map> records = new HashMap>();
- Map> recordMetadata = new HashMap>();
- for(TopicPartition partition : subscribedPartitions) {
- // get the last consumed offset
- long messageSequence = consumedOffsets.get(partition);
- ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
- ObjectOutputStream outputStream;
- try {
- outputStream = new ObjectOutputStream(byteStream);
- outputStream.writeLong(messageSequence++);
- outputStream.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- List recordsForTopic = records.get(partition.topic());
- if(recordsForTopic == null) {
- recordsForTopic = new ArrayList();
- records.put(partition.topic(), recordsForTopic);
- }
- recordsForTopic.add(new ConsumerRecord(partition.topic(), partition.partition(), null, byteStream.toByteArray(), messageSequence));
- consumedOffsets.put(partition, messageSequence);
+ public synchronized ConsumerRecords poll(long timeout) {
+ ensureNotClosed();
+ // update the consumed offset
+ for (Map.Entry>> entry : this.records.entrySet()) {
+ List> recs = entry.getValue();
+ if (!recs.isEmpty())
+ this.subscriptions.consumed(entry.getKey(), recs.get(recs.size() - 1).offset());
}
- for(Entry> recordsPerTopic : records.entrySet()) {
- Map> recordsPerPartition = new HashMap>();
- for(ConsumerRecord record : recordsPerTopic.getValue()) {
- List recordsForThisPartition = recordsPerPartition.get(record.partition());
- if(recordsForThisPartition == null) {
- recordsForThisPartition = new ArrayList();
- recordsPerPartition.put(record.partition(), recordsForThisPartition);
- }
- recordsForThisPartition.add(record);
- }
- recordMetadata.put(recordsPerTopic.getKey(), new ConsumerRecords(recordsPerTopic.getKey(), recordsPerPartition));
+
+ ConsumerRecords copy = new ConsumerRecords(this.records);
+ this.records = new HashMap>>();
+ return copy;
+ }
+
+ public synchronized void addRecord(ConsumerRecord record) {
+ ensureNotClosed();
+ TopicPartition tp = new TopicPartition(record.topic(), record.partition());
+ this.subscriptions.assignedPartitions().add(tp);
+ List> recs = this.records.get(tp);
+ if (recs == null) {
+ recs = new ArrayList>();
+ this.records.put(tp, recs);
}
- return recordMetadata;
+ recs.add(record);
}
@Override
- public OffsetMetadata commit(Map offsets, boolean sync) {
- if(!sync)
- return null;
- for(Entry partitionOffset : offsets.entrySet()) {
- committedOffsets.put(partitionOffset.getKey(), partitionOffset.getValue());
- }
- return new OffsetMetadata(committedOffsets, null);
+ public synchronized void commit(Map offsets, CommitType commitType) {
+ ensureNotClosed();
+ for (Entry entry : offsets.entrySet())
+ subscriptions.committed(entry.getKey(), entry.getValue());
}
@Override
- public OffsetMetadata commit(boolean sync) {
- if(!sync)
- return null;
- return commit(consumedOffsets, sync);
+ public synchronized void commit(CommitType commitType) {
+ ensureNotClosed();
+ commit(this.subscriptions.allConsumed(), commitType);
}
@Override
- public void seek(Map offsets) {
- // change the fetch offsets
- for(Entry partitionOffset : offsets.entrySet()) {
- consumedOffsets.put(partitionOffset.getKey(), partitionOffset.getValue());
- }
+ public synchronized void seek(TopicPartition partition, long offset) {
+ ensureNotClosed();
+ subscriptions.seek(partition, offset);
}
@Override
- public Map committed(Collection partitions) {
- Map offsets = new HashMap();
- for(TopicPartition partition : partitions) {
- offsets.put(new TopicPartition(partition.topic(), partition.partition()), committedOffsets.get(partition));
- }
- return offsets;
+ public synchronized long committed(TopicPartition partition) {
+ ensureNotClosed();
+ return subscriptions.committed(partition);
}
@Override
- public Map position(Collection partitions) {
- Map positions = new HashMap();
- for(TopicPartition partition : partitions) {
- positions.put(partition, consumedOffsets.get(partition));
- }
- return positions;
+ public synchronized long position(TopicPartition partition) {
+ ensureNotClosed();
+ return subscriptions.consumed(partition);
+ }
+
+ @Override
+ public synchronized void seekToBeginning(TopicPartition... partitions) {
+ ensureNotClosed();
+ throw new UnsupportedOperationException();
}
@Override
- public Map offsetsBeforeTime(long timestamp,
- Collection partitions) {
+ public synchronized void seekToEnd(TopicPartition... partitions) {
+ ensureNotClosed();
throw new UnsupportedOperationException();
}
@Override
public Map metrics() {
- return null;
+ ensureNotClosed();
+ return Collections.emptyMap();
}
@Override
- public void close() {
- // unsubscribe from all partitions
- TopicPartition[] allPartitions = new TopicPartition[subscribedPartitions.size()];
- unsubscribe(subscribedPartitions.toArray(allPartitions));
+ public synchronized List partitionsFor(String topic) {
+ ensureNotClosed();
+ List parts = this.partitions.get(topic);
+ if (parts == null)
+ return Collections.emptyList();
+ else
+ return parts;
+ }
+
+ public synchronized void updatePartitions(String topic, List partitions) {
+ ensureNotClosed();
+ this.partitions.put(topic, partitions);
+ }
+
+ @Override
+ public synchronized void close() {
+ ensureNotClosed();
+ this.closed = true;
+ }
+
+ private void ensureNotClosed() {
+ if (this.closed)
+ throw new IllegalStateException("This consumer has already been closed.");
}
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/NoOffsetForPartitionException.java b/clients/src/main/java/org/apache/kafka/clients/consumer/NoOffsetForPartitionException.java
new file mode 100644
index 000000000000..a21f97be5c2e
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/NoOffsetForPartitionException.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.common.KafkaException;
+
+/**
+ * Indicates that there is no stored offset and no defined offset reset policy
+ */
+public class NoOffsetForPartitionException extends KafkaException {
+
+ private static final long serialVersionUID = 1L;
+
+ public NoOffsetForPartitionException(String message) {
+ super(message);
+ }
+
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetMetadata.java b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetMetadata.java
deleted file mode 100644
index ea423ad15eeb..000000000000
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetMetadata.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.clients.consumer;
-
-import java.util.Map;
-
-import org.apache.kafka.common.TopicPartition;
-
-/**
- * The metadata for an offset commit that has been acknowledged by the server
- */
-public final class OffsetMetadata {
-
- private final Map offsets;
- private final Map errors;
-
- public OffsetMetadata(Map offsets, Map errors) {
- super();
- this.offsets = offsets;
- this.errors = errors;
- }
-
- public OffsetMetadata(Map offsets) {
- this(offsets, null);
- }
-
- /**
- * The offset of the record in the topic/partition.
- */
- public long offset(TopicPartition partition) {
- if(this.errors != null)
- throw errors.get(partition);
- return offsets.get(partition);
- }
-
- /**
- * @return The exception corresponding to the error code returned by the server
- */
- public Exception error(TopicPartition partition) {
- if(errors != null)
- return errors.get(partition);
- else
- return null;
- }
-}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
new file mode 100644
index 000000000000..d9483ecf6ae4
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
@@ -0,0 +1,47 @@
+package org.apache.kafka.clients.consumer.internals;
+
+/**
+ * A helper class for managing the heartbeat to the co-ordinator
+ */
+public final class Heartbeat {
+
+ /* The number of heartbeats to attempt to complete per session timeout interval.
+ * so, e.g., with a session timeout of 3 seconds we would attempt a heartbeat
+ * once per second.
+ */
+ private final static int HEARTBEATS_PER_SESSION_INTERVAL = 3;
+
+ private final long timeout;
+ private long lastHeartbeatSend;
+ private long lastHeartbeatResponse;
+
+ public Heartbeat(long timeout, long now) {
+ this.timeout = timeout;
+ this.lastHeartbeatSend = now;
+ this.lastHeartbeatResponse = now;
+ }
+
+ public void sentHeartbeat(long now) {
+ this.lastHeartbeatSend = now;
+ }
+
+ public void receivedResponse(long now) {
+ this.lastHeartbeatResponse = now;
+ }
+
+ public void markDead() {
+ this.lastHeartbeatResponse = -1;
+ }
+
+ public boolean isAlive(long now) {
+ return now - lastHeartbeatResponse <= timeout;
+ }
+
+ public boolean shouldHeartbeat(long now) {
+ return now - lastHeartbeatSend > (1.0 / HEARTBEATS_PER_SESSION_INTERVAL) * this.timeout;
+ }
+
+ public long lastHeartbeatSend() {
+ return this.lastHeartbeatSend;
+ }
+}
\ No newline at end of file
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoOpConsumerRebalanceCallback.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoOpConsumerRebalanceCallback.java
new file mode 100644
index 000000000000..7e57a39690d9
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoOpConsumerRebalanceCallback.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer.internals;
+
+import java.util.Collection;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceCallback;
+import org.apache.kafka.common.TopicPartition;
+
+public class NoOpConsumerRebalanceCallback implements ConsumerRebalanceCallback {
+
+ @Override
+ public void onPartitionsAssigned(Consumer,?> consumer, Collection partitions) {}
+
+ @Override
+ public void onPartitionsRevoked(Consumer,?> consumer, Collection partitions) {}
+
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
new file mode 100644
index 000000000000..71ce20db955b
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -0,0 +1,166 @@
+package org.apache.kafka.clients.consumer.internals;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.common.TopicPartition;
+
+/**
+ * A class for tracking the topics, partitions, and offsets for the consumer
+ */
+public class SubscriptionState {
+
+ /* the list of topics the user has requested */
+ private final Set subscribedTopics;
+
+ /* the list of partitions the user has requested */
+ private final Set subscribedPartitions;
+
+ /* the list of partitions currently assigned */
+ private final Set assignedPartitions;
+
+ /* the offset exposed to the user */
+ private final Map consumed;
+
+ /* the current point we have fetched up to */
+ private final Map fetched;
+
+ /* the last committed offset for each partition */
+ private final Map