New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-2983: Remove Scala consumers and related code #5230
Conversation
@lindong28, do you have time to take a look? I started a system tests run, but maybe you could take an initial pass, if you have time. |
@ijuma Sure! I will review it. |
81407be
to
92da9bd
Compare
@lindong28 I rebased as a PR had been merged that had broken this branch. Also added a description to the pull request of what is included in the PR. |
@rajinisivaram, @lindong28 said that he will be able to review this PR. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the patch! Just have some minor comments. LGTM. I think we can commit the patch after it passes all system tests.
|
||
// We need the two while loop to make sure when old consumer is used, even there is no message we | ||
// still commit offset. When new consumer is used, this is handled by poll(timeout). | ||
// FIXME Is it safe to remove one of the loops? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Originally we need two while loops because mirrorMakerConsumer.receive
will block infinitely for old consumer if there is no data. I think it should be safe to just remote the second while loop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the explanation. I'll keep the existing code (will just update the comment), but will file a JIRA for simplifying MirrorMaker.
records.asScala.foreach(producer.send) | ||
maybeFlushAndCommitOffsets() | ||
} | ||
} catch { | ||
case _: ConsumerTimeoutException => | ||
trace("Caught ConsumerTimeoutException, continue iteration.") | ||
//FIXME Is this change OK? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this change is OK.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. What I was wondering about is whether there were cases where TimeoutException
is thrown that we don't want to catch and continue. ConsumerTimeoutException
is only thrown by NewShinyConsumer
if poll
doesn't return any records. Maybe to be safe, I should maintain the exact behaviour and then we can a separate PR to simplify MirrorMaker (instead of being in the middle of this huge one).
@@ -57,7 +54,6 @@ case class Whitelist(rawRegex: String) extends TopicFilter(rawRegex) { | |||
} | |||
} | |||
|
|||
@deprecated("This class has been deprecated and will be removed in a future release.", "0.11.0.0") | |||
case class Blacklist(rawRegex: String) extends TopicFilter(rawRegex) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that Blacklist
is only used in test. Do we still need this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, we only need to keep Whitelist
.
* limitations under the License. | ||
*/ | ||
|
||
package kafka.zk |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since broker is still using zookeeper, would this test class still be useful?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The broker doesn't use ZkUtils
anymore and hence why I removed the ZKCheckedEphemeral
code and tests. There is a KafkaZkClient.CheckedEphemeral
that hasn't been removed and it's still used.
@ijuma nit: could you please also remove ZookeeperConsumerConnector related |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
if (!useOldConsumer) { | ||
CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt) | ||
} | ||
CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we remove options/code related to old metrics reporter?
@@ -81,10 +81,14 @@ <h5><a id="upgrade_200_notable" href="#upgrade_200_notable">Notable changes in 2 | |||
updated to aggregate across different versions. | |||
</li> | |||
<li><a href="https://cwiki.apache.org/confluence/x/uaBzB">KIP-225</a> changed the metric "records.lag" to use tags for topic and partition. The original version with the name format "{topic}-{partition}.records-lag" has been removed.</li> | |||
<li>The Scala consumers, which have been deprecated since 0.11.0.0, have been removed. The Java consumer has been the recommended option |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can also remove "Legacy APIs" , "Old Consumer Configs" sections from docs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, as I said in the PR description the documentation hasn't been updated and we can do that separately.
@@ -40,7 +40,7 @@ class ConsoleProducerTest { | |||
) | |||
|
|||
@Test | |||
def testValidConfigsNewProducer() { | |||
def testValidConfigs() { | |||
val config = new ConsoleProducer.ProducerConfig(validArgs) | |||
// New ProducerConfig constructor is package private, so we can't call it directly |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can remove "New" from comment. also ProducerConfig constructor is public now
@@ -1,132 +0,0 @@ | |||
/** |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we also remove ImportZkOffsets?
Also we have not removed ZK based offset management. that means we are going to maintain ZK based offset management some more time? Existing users can continue to use older release scala consumers if they want?
In that case, Can we deprecate ZK based offset management and mention in docs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, removed ImportZkOffsets
. What do you mean by "we have not removed ZK based offset management?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, we want the broker to still support everything for now. We need a KIP to discuss removal and deprecation of broker functionality related to the old consumer. I restored a change in ZkData
as I hadn't realised that the broker was relying on that functionality itself.
docs/upgrade.html
Outdated
<li>The ConsoleProducer no longer supports the Scala producer.</li> | ||
<li>MirrorMaker and ConsoleConsumer no longer support the Scala consumer, they always use the Java consumer.</li> | ||
<li>The ConsoleProducer no longer supports the Scala producer, it always uses the Java producer.</li> | ||
<li>A number of deprecated tools that rely on the Scala clients have been removed: ReplayLogProducer, SimpleConsumerPerformance, SimpleConsumerShell, UpdateOffsetsInZK, VerifyConsumerRebalance.</li> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can we include ExportZkOffsets
92da9bd
to
6f10455
Compare
Rebased and addressed the comments so far. Filed KAFKA-7062 and KAFKA-7063 for follow-ups. If anyone wants to pick them up, feel free. :) |
aceb212
to
dabfa0d
Compare
@lindong28, I fixed a few system test issues. One of the remaining ones is # Verify the following:
# 1) The broker with offline directory is the only in-sync broker of the partition of topic2
# 2) Messages can still be produced and consumed from topic2
self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, self.topic2,
throughput=self.producer_throughput, offline_nodes=offline_nodes)
self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic2, group_id="test-consumer-group-2",
consumer_timeout_ms=90000, message_validator=is_int)
self.consumer_start_timeout_sec = 90
self.start_producer_and_consumer() Since you wrote this test, is there a chance you could take a look? It's a bit worrying that this doesn't work with the Java consumer. |
@ijuma Sure, I will look into this issue. |
I'm running the broker system tests in Jenkins again (https://jenkins.confluent.io/view/All/job/system-test-kafka-branch-builder/1803/console), but I believe the only remaining failing test is |
dabfa0d
to
7a8493f
Compare
@lindong28 do you think you'll have time to look into this today? If not, I'll partially disable the test, file a JIRA and merge this PR. These changes don't introduce the issue after all, they are just exposing the fact that the test as it stands doesn't work with the new consumer. |
@ijuma Yes! I am looking into this now. Sorry for the delay. |
@lindong28 Thanks! To be clear, there was no delay on your part, it's just that the timeline for this PR to make it into 2.0 is very short. :) So, I was trying to understand your availability to figure out the best path forward. I will wait in case there's an easy tweak to the test to make it pass with the new consumer. |
@lindong28 interestingly, only 1 of 4 log dir failure tests failed in the run I started last night: https://jenkins.confluent.io/view/All/job/system-test-kafka-branch-builder/1804/consoleFull So, it looks flaky instead of failing deterministically. |
@lindong28 Looking at the test a bit more, when we "Shutdown all other brokers so that the broker with offline log dir is the only online broker", how do we ensure that there are no consumer_offsets partitions that become offline? |
@ijuma Sure thing. I would be more than happy to make this into 2.0 to make it shiny instead of disabling the test :) Mostly likely I will be able to fix it today. That is a very good point. The default offsets.topic.replication.factor in Kafka server is 3. Since we start three brokers in the test before we start producer and consumer, the offset topic will be created with RF=3. And I verified that the consumer offset topic is replicated across all three brokers and the test still failed. |
Right. Do I understand correctly that we have 2 out of 3 brokers down and the remaining online broker has one log dir offline? It seems like some consumer offset partitions could be in the offline log dir, which would cause the coordinator to never become available again. |
@ijuma Ah I see you point. Yes you are right. If the consumer offset topic is in the offline log directory of the only online broker then it will cause the CoordinatorNotAvailableException. Let me try to fix this problem. Thanks! |
@lindong28 yes :) |
…e to work with newer broker versions
7a8493f
to
2e94ea1
Compare
Thanks @lindong28. The log dir failure tests pass after that patch is applied: https://jenkins.confluent.io/view/All/job/system-test-kafka-branch-builder/1810/ |
@ijuma Great! The patch looks good overall when I reviewed a few days ago. A few minor commits have been made to ensure that we pass integration tests and system tests. Do you need me to go over this again? |
@lindong28, it's enough for you to review the commits added since your last review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Left only one minor comment.
@@ -318,13 +246,7 @@ object ConsoleConsumer extends Logging { | |||
.ofType(classOf[java.lang.Integer]) | |||
val skipMessageOnErrorOpt = parser.accepts("skip-message-on-error", "If there is an error when processing a message, " + | |||
"skip it instead of halt.") | |||
val csvMetricsReporterEnabledOpt = parser.accepts("csv-reporter-enabled", "If set, the CSV metrics reporter will be enabled") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is csvMetricsReporterEnabledOpt
only used with scala consumer? It seems that KafkaCSVMetricsReporter
is not deprecated or removed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
KafkaCSVMetricsReporter is based on Yammer Metrics. This is still used by the broker and hence why it's not deprecated. The Java clients use Kafka Metrics instead. So, it seems to me that there will be no metrics reported if this is enabled with the Java consumer making it useless. Am I missing something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah I see. I didn't know that KafkaCSVMetricsReporter is based on Yammer metrics. Then it makes sense to remove it since only scala clients use Yammer metrics. Thanks for the explanation. I have no further comments.
Thanks for the reviews. Merging to trunk and 2.0. |
- Removed Scala consumers (`SimpleConsumer` and `ZooKeeperConsumerConnector`) and their tests. - Removed Scala request/response/message classes. - Removed any mention of new consumer or new producer in the code with the exception of MirrorMaker where the new.consumer option was never deprecated so we have to keep it for now. The non-code documentation has not been updated either, that will be done separately. - Removed a number of tools that only made sense in the context of the Scala consumers (see upgrade notes). - Updated some tools that worked with both Scala and Java consumers so that they only support the latter (see upgrade notes). - Removed `BaseConsumer` and related classes apart from `BaseRecord` which is used in `MirrorMakerMessageHandler`. The latter is a pluggable interface so effectively public API. - Removed `ZkUtils` methods that were only used by the old consumers. - Removed `ZkUtils.registerBroker` and `ZKCheckedEphemeral` since the broker now uses the methods in `KafkaZkClient` and no-one else should be using that method. - Updated system tests so that they don't use the Scala consumers except for multi-version tests. - Updated LogDirFailureTest so that the consumer offsets topic would continue to be available after all the failures. This was necessary for it to work with the Java consumer. - Some multi-version system tests had not been updated to include recently released Kafka versions, fixed it. - Updated findBugs and checkstyle configs not to refer to deleted classes and packages. Reviewers: Dong Lin <lindong28@gmail.com>, Manikumar Reddy <manikumar.reddy@gmail.com>
- Removed Scala consumers (`SimpleConsumer` and `ZooKeeperConsumerConnector`) and their tests. - Removed Scala request/response/message classes. - Removed any mention of new consumer or new producer in the code with the exception of MirrorMaker where the new.consumer option was never deprecated so we have to keep it for now. The non-code documentation has not been updated either, that will be done separately. - Removed a number of tools that only made sense in the context of the Scala consumers (see upgrade notes). - Updated some tools that worked with both Scala and Java consumers so that they only support the latter (see upgrade notes). - Removed `BaseConsumer` and related classes apart from `BaseRecord` which is used in `MirrorMakerMessageHandler`. The latter is a pluggable interface so effectively public API. - Removed `ZkUtils` methods that were only used by the old consumers. - Removed `ZkUtils.registerBroker` and `ZKCheckedEphemeral` since the broker now uses the methods in `KafkaZkClient` and no-one else should be using that method. - Updated system tests so that they don't use the Scala consumers except for multi-version tests. - Updated LogDirFailureTest so that the consumer offsets topic would continue to be available after all the failures. This was necessary for it to work with the Java consumer. - Some multi-version system tests had not been updated to include recently released Kafka versions, fixed it. - Updated findBugs and checkstyle configs not to refer to deleted classes and packages. Reviewers: Dong Lin <lindong28@gmail.com>, Manikumar Reddy <manikumar.reddy@gmail.com>
: Remove Scala consumers and related code (apache#5230)" (commit cc4dce9).
…hich was removed by "KAFKA-2983: Remove Scala consumers and related code (apache#5230)" (commit cc4dce9). TICKET = LI_DESCRIPTION = EXIT_CRITERIA = MANUAL ["describe exit criteria"]
…hich was removed by "KAFKA-2983: Remove Scala consumers and related code (apache#5230)" (commit cc4dce9). TICKET = LI_DESCRIPTION = EXIT_CRITERIA = MANUAL ["describe exit criteria"]
…hich was removed by "KAFKA-2983: Remove Scala consumers and related code (apache#5230)" (commit cc4dce9). TICKET = LI_DESCRIPTION = This patch included fix by Jon lee from 2.0-li branch and a follow up fix by xiongqi welsey wu for 2.3 branch. EXIT_CRITERIA = MANUAL ["When old scala consumer is no longer needed"]
…hich was removed by "KAFKA-2983: Remove Scala consumers and related code (apache#5230)" (commit cc4dce9) in upstream. TICKET = LI_DESCRIPTION = This patch included fix by Jon Lee from 2.0-li branch and a follow up fix by Xiongqi Welsey Wu for 2.3-li branch. EXIT_CRITERIA = MANUAL ["When old Scala consumer is no longer needed"]
SimpleConsumer
andZooKeeperConsumerConnector
)and their tests.
with the exception of MirrorMaker where the new.consumer option was
never deprecated so we have to keep it for now. The non-code
documentation has not been updated either, that will be done
separately.
of the Scala consumers (see upgrade notes).
so that they only support the latter (see upgrade notes).
BaseConsumer
and related classes apart fromBaseRecord
which is used in
MirrorMakerMessageHandler
. The latter is a pluggableinterface so effectively public API.
ZkUtils
methods that were only used by the old consumers.ZkUtils.registerBroker
andZKCheckedEphemeral
sincethe broker now uses the methods in
KafkaZkClient
and no-one elseshould be using that method.
for multi-version tests.
continue to be available after all the failures. This was necessary for it
to work with the Java consumer.
recently released Kafka versions, fixed it.
classes and packages.
Committer Checklist (excluded from commit message)