Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-3718: propagate all KafkaConfig __consumer_offsets configs to OffsetConfig instantiation #1394

Closed
wants to merge 1 commit into from

Conversation

onurkaraman
Copy link
Contributor

Kafka has two configurable compression codecs: the one used by the client (source codec) and the one finally used when storing into the log (target codec). The target codec defaults to KafkaConfig.compressionType and can be dynamically configured through zookeeper.

The GroupCoordinator appends group membership information into the __consumer_offsets topic by:

  1. making a message with group membership information
  2. making a MessageSet with the single message compressed with the source codec
  3. doing a log.append on the MessageSet

Without this patch, KafkaConfig.offsetsTopicCompressionCodec doesn't get propagated to OffsetConfig instantiation, so GroupMetadataManager uses a source codec of NoCompressionCodec when making the MessageSet. Let's say we have enough group information such that the message formed exceeds KafkaConfig.messageMaxBytes before compression but would fall below the threshold after compression using our source codec. Even if we had dynamically configured __consumer_offsets with our favorite compression codec, the log.append will throw RecordTooLargeException during analyzeAndValidateMessageSet since the message was unexpectedly uncompressed instead of having been compressed with the source codec defined by KafkaConfig.offsetsTopicCompressionCodec.

@omkreddy
Copy link
Contributor

sometime back I submitted similar patch for old ConsumerCoordinator (now replaced with GroupCoordinator)
https://issues.apache.org/jira/browse/KAFKA-2159. I will close KAFKA-2159 JIRA.

@omkreddy
Copy link
Contributor

@ijuma
Copy link
Contributor

ijuma commented May 17, 2016

Thanks for the PR. Can we have a test for this? @hachikuji, can you please take a look?

@onurkaraman
Copy link
Contributor Author

@ijuma not really sure how you want me to test this.

I mean if we really wanted to, I guess I can unit test the GroupCoordinator constructor by instantiating a GroupCoordinator with a custom KafkaConfig.offsetsTopicCompressionCodec defined and verify that groupCoordinator.offsetConfig.offsetsTopicCompressionCodec has picked up the modification? Testing constructors seems kinda lame though.

@onurkaraman
Copy link
Contributor Author

@omkreddy I think the default compression.type (target codec) is "producer", meaning it adopts whatever the codec of the MessageSet was coming in. So it seems that updating the property in GroupCoordinator. offsetsTopicConfigs isn't really required. Thoughts?

Pinging @hachikuji for review.

@hachikuji
Copy link

@onurkaraman Good find. I guess the only difference if we set compression.type is how we'd react to a future config change. For example, if we initially set offsets.topic.compression.codec to snappy and write that to Zk as the topic's compression.type, then if we later change offsets.topic.compression.codec to gzip, we'd first compress as gzip and then recompress as snappy on writing (unless we change the code to alter the topic configuration). Alternatively, with the change as it is, we'll leave the data compressed according to whatever the current configuration is. That seems preferable to me.

But actually I'm a little confused. Isn't the compression.type for this topic explicitly set to uncompressed on that line above that @omkreddy points to above? Wouldn't that mean that we'd decompress before writing even with this patch? Seems like we actually need to change that to "producer." Maybe I'm missing something?

@omkreddy
Copy link
Contributor

omkreddy commented May 19, 2016

@hachikuji Yes, we should set "producer" as compression.type.

props.put(LogConfig.CompressionTypeProp, ProducerCompressionCodec.name)

@onurkaraman as mentioned by @hachikuji, for this topic we are overriding the target.codec to uncompressed. We should change it to producer (or) we should remove the line and change below default OffsetConfig codec to "producer"
https://github.com/onurkaraman/kafka/blob/d65726de2e4a196837de7bb2e11f397482826dca/core/src/main/scala/kafka/coordinator/OffsetConfig.scala#L58

@onurkaraman
Copy link
Contributor Author

Yeah my bad. My previous comment wasn't right. Unless the config is overridden, compression.type is producer. But we explicitly set the compression.type for the __consumer_offsets topic in the line @omkreddy stated.

It looks like compression.type can only be configured on a per-topic level during topic creation through AdminUtils.createTopic or through dynamic config with kafka-configs.sh. I had been doing dynamic config.

@onurkaraman onurkaraman force-pushed the KAFKA-3718 branch 2 times, most recently from 4dd0ab3 to 10b21e8 Compare May 19, 2016 07:32
@onurkaraman
Copy link
Contributor Author

I force pushed an update that explicitly sets the compression.type to producer so that the target codec always just adopts the source codec.

This seems better than removing the property altogether since that would make the target codec whatever the compression.type is at the time, which may not be producer. It would probably be surprising to set offsetsTopicCompressionCodec only to see the final target codec be something else.

@ijuma
Copy link
Contributor

ijuma commented May 19, 2016

@onurkaraman, with regards to testing, I think it might be useful to have an integration test where we configure the offsets topic to be compressed and then we verify that the topic is compressed with the expected compression algorithm.

@omkreddy
Copy link
Contributor

LGTM

@hachikuji
Copy link

LGTM. An integration test case would be nice if it's not too much trouble. Maybe we could commit an offset and then read it back using the simple consumer and verify that the right codec was used?

@onurkaraman
Copy link
Contributor Author

onurkaraman commented May 19, 2016

@ijuma I force pushed an update that adds an integration test checking that the offsetsTopicCompressionCodec has been propagated.

@onurkaraman
Copy link
Contributor Author

@ijuma any chance of checking this in today now that 0.10.0.0 is released?

@ijuma
Copy link
Contributor

ijuma commented May 24, 2016

@onurkaraman, I'll try, but the day is almost over in London.

@onurkaraman
Copy link
Contributor Author

Right. Timezones. Sorry I forgot.

import org.junit.Assert._


class GroupCoordinatorIntegrationTest extends KafkaServerTestHarness {
Copy link
Contributor

@ijuma ijuma May 26, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding the test. I made a few changes to make it a bit more readable and to improve the error if the test fails:

package integration.kafka.api

import kafka.common.TopicAndPartition
import kafka.integration.KafkaServerTestHarness
import kafka.log.Log
import kafka.message.GZIPCompressionCodec
import kafka.server.KafkaConfig
import kafka.utils.TestUtils
import org.apache.kafka.clients.consumer.OffsetAndMetadata
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.internals.TopicConstants
import org.apache.kafka.common.protocol.SecurityProtocol
import org.junit.Test
import org.junit.Assert._

import scala.collection.JavaConverters._
import java.util.Properties

class GroupCoordinatorIntegrationTest extends KafkaServerTestHarness {
  val offsetsTopicCompressionCodec = GZIPCompressionCodec
  val overridingProps = new Properties()
  overridingProps.put(KafkaConfig.OffsetsTopicPartitionsProp, "1")
  overridingProps.put(KafkaConfig.OffsetsTopicCompressionCodecProp, offsetsTopicCompressionCodec.codec.toString)

  override def generateConfigs = TestUtils.createBrokerConfigs(1, zkConnect, enableControlledShutdown = false).map {
    KafkaConfig.fromProps(_, overridingProps)
  }

  @Test
  def testGroupCoordinatorPropagatesOfffsetsTopicCompressionCodec() {
    val consumer = TestUtils.createNewConsumer(TestUtils.getBrokerListStrFromServers(servers),
      securityProtocol = SecurityProtocol.PLAINTEXT)
    val offsetMap = Map(
      new TopicPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, 0) -> new OffsetAndMetadata(10, "")
    ).asJava
    consumer.commitSync(offsetMap)
    val logManager = servers.head.getLogManager

    def getGroupMetadataLogOpt: Option[Log] =
      logManager.getLog(TopicAndPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, 0))

    TestUtils.waitUntilTrue(() => getGroupMetadataLogOpt.exists(_.logSegments.exists(_.log.nonEmpty)),
      "Commit message not appended in time")

    val logSegments = getGroupMetadataLogOpt.get.logSegments
    val incorrectCompressionCodecs = logSegments.flatMap(_.log.map(_.message.compressionCodec)).filter(_ != offsetsTopicCompressionCodec)
    assertEquals("Incorrect compression codecs should be empty", Seq.empty, incorrectCompressionCodecs)

    consumer.close()
  }
}

What do you think?

@ijuma
Copy link
Contributor

ijuma commented May 26, 2016

@onurkaraman, I left one suggestion. If you agree and integrate that, then I think we can merge this PR.

…instantiation

Kafka has two configurable compression codecs: the one used by the client (source codec) and the one finally used when storing into the log (target codec). The target codec defaults to KafkaConfig.compressionType and can be dynamically configured through zookeeper.

The GroupCoordinator appends group membership information into the __consumer_offsets topic by:
1. making a message with group membership information
2. making a MessageSet with the single message compressed with the source codec
3. doing a log.append on the MessageSet

Without this patch, KafkaConfig.offsetsTopicCompressionCodec doesn't get propagated to OffsetConfig instantiation, so GroupMetadataManager uses a source codec of NoCompressionCodec when making the MessageSet. Let's say we have enough group information such that the message formed exceeds KafkaConfig.messageMaxBytes before compression but would fall below the threshold after compression using our source codec. Even if we had dynamically configured __consumer_offsets with our favorite compression codec, the log.append will throw RecordTooLargeException during analyzeAndValidateMessageSet since the message was unexpectedly uncompressed instead of having been compressed with the source codec defined by KafkaConfig.offsetsTopicCompressionCodec.
@onurkaraman
Copy link
Contributor Author

Thanks for the cleanup @ijuma. Looks a lot cleaner. I force pushed the change.

@ijuma
Copy link
Contributor

ijuma commented May 26, 2016

Thanks Onur, LGTM. Merging to trunk and 0.10.0.

asfgit pushed a commit that referenced this pull request May 26, 2016
…ffsetConfig instantiation

Kafka has two configurable compression codecs: the one used by the client (source codec) and the one finally used when storing into the log (target codec). The target codec defaults to KafkaConfig.compressionType and can be dynamically configured through zookeeper.

The GroupCoordinator appends group membership information into the __consumer_offsets topic by:
1. making a message with group membership information
2. making a MessageSet with the single message compressed with the source codec
3. doing a log.append on the MessageSet

Without this patch, KafkaConfig.offsetsTopicCompressionCodec doesn't get propagated to OffsetConfig instantiation, so GroupMetadataManager uses a source codec of NoCompressionCodec when making the MessageSet. Let's say we have enough group information such that the message formed exceeds KafkaConfig.messageMaxBytes before compression but would fall below the threshold after compression using our source codec. Even if we had dynamically configured __consumer_offsets with our favorite compression codec, the log.append will throw RecordTooLargeException during analyzeAndValidateMessageSet since the message was unexpectedly uncompressed instead of having been compressed with the source codec defined by KafkaConfig.offsetsTopicCompressionCodec.

Author: Onur Karaman <okaraman@linkedin.com>

Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>, Jason Gustafson <jason@confluent.io>, Ismael Juma <ismael@juma.me.uk>

Closes #1394 from onurkaraman/KAFKA-3718

(cherry picked from commit 62dc1af)
Signed-off-by: Ismael Juma <ismael@juma.me.uk>
@asfgit asfgit closed this in 62dc1af May 26, 2016
bbejeck added a commit to bbejeck/kafka that referenced this pull request Jun 2, 2016
KAFKA-3735: Dispose all RocksObejcts upon completeness

Author: Guozhang Wang <wangguoz@gmail.com>

Reviewers: Roger Hoover, Eno Thereska, Ismael Juma

Closes apache#1411 from guozhangwang/K3735-dispose-rocksobject

MINOR: Specify keyalg RSA for SSL key generation

Author: Sriharsha Chintalapani <harsha@hortonworks.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes apache#1416 from harshach/ssl-doc-fix

KAFKA-3747; Close `RecordBatch.records` when append to batch fails

With this change, `test_producer_throughput` with message_size=10000, compression_type=snappy and a snappy buffer size of 32k can be executed in a heap of 192m in a local environment (768m is needed without this change).

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Guozhang Wang <wangguoz@gmail.com>

Closes apache#1418 from ijuma/kafka-3747-close-record-batch-when-append-fails

MINOR: Fix documentation table of contents and `BLOCK_ON_BUFFER_FULL_DOC`

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Gwen Shapira

Closes apache#1423 from ijuma/minor-doc-fixes

Minor: Fix ps command example in docs

Process grep command has been updated. Previous "ps  | grep server-1.properties"  command is showing nothing.

Author: Satendra Kumar <satendra@knoldus.com>

Reviewers: Gwen Shapira

Closes apache#1386 from satendrakumar06/patch-1

KAFKA-3683; Add file descriptor recommendation to ops guide

Adding sizing recommendations for file descriptors to the ops guide.

Author: Dustin Cote <dustin@confluent.io>
Author: Dustin Cote <dustin@dustins-mbp.attlocal.net>

Reviewers: Gwen Shapira

Closes apache#1353 from cotedm/KAFKA-3683 and squashes the following commits:

8120318 [Dustin Cote] Adding file descriptor sizing recommendations
0908aa9 [Dustin Cote] Merge https://github.com/apache/kafka into trunk
32315e4 [Dustin Cote] Merge branch 'trunk' of https://github.com/cotedm/kafka into trunk
13309ed [Dustin Cote] Update links for new consumer API
4dcffc1 [Dustin Cote] Update links for new consumer API

MINOR: Add virtual env to Kafka system test README.md

Author: Liquan Pei <liquanpei@gmail.com>

Reviewers: Gwen Shapira

Closes apache#1346 from Ishiihara/add-venv

MINOR: Removed 1/2 of the hardcoded sleeps in Streams

Author: Eno Thereska <eno.thereska@gmail.com>

Reviewers: Guozhang Wang <wangguoz@gmail.com>, Ismael Juma <ismael@juma.me.uk>

Closes apache#1422 from enothereska/minor-integration-timeout2

KAFKA-3732: Add an auto accept option to kafka-acls.sh

Added a new argument to AclCommand: --yes. When set, automatically answer yes to prompts

Author: Mickael Maison <mickael.maison@gmail.com>

Reviewers: Gwen Shapira

Closes apache#1406 from mimaison/KAFKA-3732

KAFKA-3718; propagate all KafkaConfig __consumer_offsets configs to OffsetConfig instantiation

Kafka has two configurable compression codecs: the one used by the client (source codec) and the one finally used when storing into the log (target codec). The target codec defaults to KafkaConfig.compressionType and can be dynamically configured through zookeeper.

The GroupCoordinator appends group membership information into the __consumer_offsets topic by:
1. making a message with group membership information
2. making a MessageSet with the single message compressed with the source codec
3. doing a log.append on the MessageSet

Without this patch, KafkaConfig.offsetsTopicCompressionCodec doesn't get propagated to OffsetConfig instantiation, so GroupMetadataManager uses a source codec of NoCompressionCodec when making the MessageSet. Let's say we have enough group information such that the message formed exceeds KafkaConfig.messageMaxBytes before compression but would fall below the threshold after compression using our source codec. Even if we had dynamically configured __consumer_offsets with our favorite compression codec, the log.append will throw RecordTooLargeException during analyzeAndValidateMessageSet since the message was unexpectedly uncompressed instead of having been compressed with the source codec defined by KafkaConfig.offsetsTopicCompressionCodec.

Author: Onur Karaman <okaraman@linkedin.com>

Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>, Jason Gustafson <jason@confluent.io>, Ismael Juma <ismael@juma.me.uk>

Closes apache#1394 from onurkaraman/KAFKA-3718

Setting broker state as running after publishing to ZK

junrao

Currently, the broker state is set to running before it registers itself in ZooKeeper.  This is too early in the broker lifecycle.  If clients use the broker state as an indicator that the broker is ready to accept requests, they will get errors.  This change is to delay setting the broker state to running until it's registered in ZK.

Author: Roger Hoover <roger.hoover@gmail.com>

Reviewers: Jun Rao <junrao@gmail.com>

Closes apache#1426 from theduderog/broker-running-after-zk

MINOR: Use `--force` instead of `--yes` in `AclCommand`

To be consistent with `ConfigCommand` and `TopicCommand`.

No release includes this option yet, so we can simply change it.

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Mickael Maison, Grant Henke

Closes apache#1430 from ijuma/use-force-instead-of-yes-in-acl-command and squashes the following commits:

bdf3a57 [Ismael Juma] Update `AclCommandTest`
78b8467 [Ismael Juma] Change variable name to `forceOpt`
0bb27af [Ismael Juma] Use `--force` instead of `--yes` in `AclCommand`

MINOR: Fix wrong comments

Author: Yukun Guo <gyk.net@gmail.com>

Reviewers: Gwen Shapira

Closes apache#1198 from gyk/fix-comment

KAFKA-3723: Cannot change size of schema cache for JSON converter

Author: Christian Posta <christian.posta@gmail.com>

Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>

Closes apache#1401 from christian-posta/ceposta-connect-class-cast-error

KAFKA-3710: MemoryOffsetBackingStore shutdown

ExecutorService needs to be shutdown on close, lest a zombie thread
prevent clean shutdown.

ewencp

Author: Peter Davis <peter.davis@expeditors.com>

Reviewers: Liquan Pei <liquanpei@gmail.com>, Ewen Cheslack-Postava <ewen@confluent.io>

Closes apache#1383 from davispw/KAFKA-3710

MINOR: Delete unused code in FileStreamSourceTask

Author: leisore <leisore@gmail.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>, Ewen Cheslack-Postava <ewen@confluent.io>

Closes apache#1433 from leisore/master

KAFKA-3749; fix "BOOSTRAP_SERVERS_DOC" typo

Author: manuzhang <owenzhang1990@gmail.com>

Reviewers: Guozhang Wang <wangguoz@gmail.com>, Ewen Cheslack-Postava <ewen@confluent.io>, Ismael Juma <ismael@juma.me.uk>

Closes apache#1420 from manuzhang/KAFKA-3749

MINOR: Fix tracing in KafkaApis.handle()

requestObj() returns null for the o.a.k.c.requests objects so use header() for these.

Once all the requests will have been replaced by o.a.k.c.requests objects, we should be able to clean that up, but in the meantime it's useful to trace both.

Author: Mickael Maison <mickael.maison@gmail.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes apache#1435 from mimaison/kafkaapis_trace

MINOR: Fix a couple of scaladoc typos

Author: Vahid Hashemian <vahidhashemian@us.ibm.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes apache#1440 from vahidhashemian/typo06/fix_typos_in_code_comments

KAFKA-3682; ArrayIndexOutOfBoundsException thrown by

SkimpyOffsetMap.get() when full

Limited number of attempts to number of map slots after the internal
positionOf() goes into linear search mode.
Added unit test

Co-developed with mimaison

Author: edoardo <ecomar@uk.ibm.com>

Reviewers: Jun Rao <junrao@gmail.com>

Closes apache#1352 from edoardocomar/KAFKA-3682

KAFKA-3678: Removed sleep from streams integration tests

Author: Eno Thereska <eno.thereska@gmail.com>

Reviewers: Guozhang Wang <wangguoz@gmail.com>

Closes apache#1439 from enothereska/KAFKA-3678-timeouts1

KAFKA-3767; Add missing license to connect-test.properties

This address to https://issues.apache.org/jira/browse/KAFKA-3767.

Author: Sasaki Toru <sasakitoa@nttdata.co.jp>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes apache#1443 from sasakitoa/test_failure_no_license

KAFKA-3158; ConsumerGroupCommand should tell whether group is actually dead

This patch fix differentiates between when a consumer group is rebalancing or dead and reports the appropriate error message.

Author: Ishita Mandhan <imandha@us.ibm.com>

Reviewers: Vahid Hashemian <vahidhashemian@us.ibm.com>, Jason Gustafson <jason@confluent.io>, Ismael Juma <ismael@juma.me.uk>

Closes apache#1429 from imandhan/KAFKA-3158

KAFKA-3765; Kafka Code style corrections

Removed explicit returns, not needed parentheses, corrected variables, removed unused imports
Using isEmpty/nonEmpty  instead of size check, using head, flatmap instead of map-flatten

Author: Joshi <rekhajoshm@gmail.com>
Author: Rekha Joshi <rekhajoshm@gmail.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes apache#1442 from rekhajoshm/KAFKA-3765

MINOR: Remove synchronized as the tasks are executed sequentially

Author: Liquan Pei <liquanpei@gmail.com>

Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>

Closes apache#1441 from Ishiihara/remove-synchronized

MINOR: Avoid trace logging computation in `checkEnoughReplicasReachOffset`

`numAcks` is only used in the `trace` logging statement so it should be a `def` instead of a `val`. Also took the chance to improve the code and documentation a little.

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Guozhang Wang <wangguoz@gmail.com>, Ewen Cheslack-Postava <ewen@confluent.io>

Closes apache#1449 from ijuma/minor-avoid-trace-logging-computation-in-partition
gfodor pushed a commit to AltspaceVR/kafka that referenced this pull request Jun 3, 2016
…ffsetConfig instantiation

Kafka has two configurable compression codecs: the one used by the client (source codec) and the one finally used when storing into the log (target codec). The target codec defaults to KafkaConfig.compressionType and can be dynamically configured through zookeeper.

The GroupCoordinator appends group membership information into the __consumer_offsets topic by:
1. making a message with group membership information
2. making a MessageSet with the single message compressed with the source codec
3. doing a log.append on the MessageSet

Without this patch, KafkaConfig.offsetsTopicCompressionCodec doesn't get propagated to OffsetConfig instantiation, so GroupMetadataManager uses a source codec of NoCompressionCodec when making the MessageSet. Let's say we have enough group information such that the message formed exceeds KafkaConfig.messageMaxBytes before compression but would fall below the threshold after compression using our source codec. Even if we had dynamically configured __consumer_offsets with our favorite compression codec, the log.append will throw RecordTooLargeException during analyzeAndValidateMessageSet since the message was unexpectedly uncompressed instead of having been compressed with the source codec defined by KafkaConfig.offsetsTopicCompressionCodec.

Author: Onur Karaman <okaraman@linkedin.com>

Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>, Jason Gustafson <jason@confluent.io>, Ismael Juma <ismael@juma.me.uk>

Closes apache#1394 from onurkaraman/KAFKA-3718
granthenke pushed a commit to granthenke/kafka that referenced this pull request Oct 24, 2016
…ffsetConfig instantiation

Kafka has two configurable compression codecs: the one used by the client (source codec) and the one finally used when storing into the log (target codec). The target codec defaults to KafkaConfig.compressionType and can be dynamically configured through zookeeper.

The GroupCoordinator appends group membership information into the __consumer_offsets topic by:
1. making a message with group membership information
2. making a MessageSet with the single message compressed with the source codec
3. doing a log.append on the MessageSet

Without this patch, KafkaConfig.offsetsTopicCompressionCodec doesn't get propagated to OffsetConfig instantiation, so GroupMetadataManager uses a source codec of NoCompressionCodec when making the MessageSet. Let's say we have enough group information such that the message formed exceeds KafkaConfig.messageMaxBytes before compression but would fall below the threshold after compression using our source codec. Even if we had dynamically configured __consumer_offsets with our favorite compression codec, the log.append will throw RecordTooLargeException during analyzeAndValidateMessageSet since the message was unexpectedly uncompressed instead of having been compressed with the source codec defined by KafkaConfig.offsetsTopicCompressionCodec.

Author: Onur Karaman <okaraman@linkedin.com>

Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>, Jason Gustafson <jason@confluent.io>, Ismael Juma <ismael@juma.me.uk>

Closes apache#1394 from onurkaraman/KAFKA-3718

(cherry picked from commit 62dc1af)
Signed-off-by: Ismael Juma <ismael@juma.me.uk>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants