Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-15649: Handle directory failure timeout #15697

Merged
merged 4 commits into from
May 23, 2024

Conversation

viktorsomogyi
Copy link
Contributor

A broker that is unable to communicate with the controller will shut down after the configurable log.dir.failure.timeout.ms.

The implementation adds a new event to the Kafka EventQueue. This event is deferred by the configured timeout and will execute the shutdown if the heartbeat communication containing the failed log dir is still pending with the controller.

An extra test is added to LogDirFailureTest to cover the case when a failure couldn't be communicated with the broker. The test case shuts down the controller and causes a log dir failure to assert the expected broker shutdown.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@viktorsomogyi
Copy link
Contributor Author

Rebased it due to conflicts.

@@ -94,6 +94,7 @@ public class Defaults {
public static final int LOG_FLUSH_START_OFFSET_CHECKPOINT_INTERVAL_MS = 60000;
public static final int NUM_RECOVERY_THREADS_PER_DATA_DIR = 1;
public static final boolean AUTO_CREATE_TOPICS_ENABLE = true;
public static final long LOG_DIR_FAILURE_TIMEOUT_MS = 30000L;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This default seems reasonable to me.

@@ -507,6 +522,7 @@ class BrokerLifecycleManager(
if (errorCode == Errors.NONE) {
val responseData = message.data()
failedAttempts = 0
offlineDirs = offlineDirs.map(kv => kv._1 -> true)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is incorrect. If a new failed directory is added to offlineDirs in-between a hearbeat request-resopnse, then we'll clear it here before knowing if it will propagated to the controller.

One idea is to hand down the offline dirs set in the request in sendBrokerHeartBeat() to BrokerHeartbeatResponseEvent through BrokerHeartbeatResponseHandler as a new constructor argument.

private class OfflineDirBrokerFailureEvent(offlineDir: Uuid) extends EventQueue.Event {
override def run(): Unit = {
if (!offlineDirs.getOrElse(offlineDir, false)) {
error(s"Shutting down because couldn't communicate offline log dirs with controllers")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should include the directory in the error. It might also be helpful to resolve the directory ID to its path. Perhaps something like dirIdToPath in AssignmentsManager should be made available here as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll print the UUID here only and I'll modify other log statements to contain the UUID so one can pair these log statements when analyzing the logs. Printing the dir path here would be a little bit bigger stretch as we currently don't propagate it down to this level. Let me know if you think it'd be better to print the path here.

@@ -211,7 +211,8 @@ class BrokerServer(
time,
s"broker-${config.nodeId}-",
isZkBroker = false,
logDirs = logManager.directoryIdsSet)
logDirs = logManager.directoryIdsSet,
() => kafkaScheduler.schedule("shutdown", () => shutdown(), 0, -1))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a scheduleOnce alternative which sets periodMs to -1.

@@ -528,6 +529,10 @@ object KafkaConfig {
"If log.message.timestamp.type=CreateTime, the message will be rejected if the difference in timestamps exceeds " +
"this specified threshold. This configuration is ignored if log.message.timestamp.type=LogAppendTime."

val LogDirFailureTimeoutMsDoc = "If the broker is unable to successfully communicate to the controller that some log " +
"directory has failed for longer than this time, and there's at least one partition with leadership on that directory, " +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and there's at least one partition with leadership

We aren't checking for this condition. We can either a) implement it; or b) keep it simple and drop this out of the configuration description.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll do another round with this, there might be a way in BrokerServer to extract this information using the combination of MetadataCache, ReplicaManager and LogManager. I'll update you tomorrow about my findings.

@@ -870,6 +875,7 @@ object KafkaConfig {
.define(CreateTopicPolicyClassNameProp, CLASS, null, LOW, CreateTopicPolicyClassNameDoc)
.define(AlterConfigPolicyClassNameProp, CLASS, null, LOW, AlterConfigPolicyClassNameDoc)
.define(LogMessageDownConversionEnableProp, BOOLEAN, LogConfig.DEFAULT_MESSAGE_DOWNCONVERSION_ENABLE, LOW, LogMessageDownConversionEnableDoc)
.define(LogDirFailureTimeoutMsProp, LONG, Defaults.LOG_DIR_FAILURE_TIMEOUT_MS, atLeast(0), MEDIUM, LogDirFailureTimeoutMsDoc)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the KIP the accepted value range is defined as >= 1. I wonder if values below 1s actually make much sense.
Also the importance was defined as low.

@viktorsomogyi
Copy link
Contributor Author

Rebased on latest trunk as there were some conflicts. Addressed some of the comments but there are 2 things I need to investigate:

  • LogDirFailureTest fails in @AfterAll likely because an incorrect shutdown, perhaps there's a timing issue
  • Check if we can detect if there are any leaders before shutdown
    I'll update on both shortly, hopefully tomorrow.

A broker that is unable to communicate with the controller will shut down
after the configurable log.dir.failure.timeout.ms.

The implementation adds a new event to the Kafka EventQueue. This event
is deferred by the configured timeout and will execute the shutdown
if the heartbeat communication containing the failed log dir is still
pending with the controller.
@viktorsomogyi
Copy link
Contributor Author

@soarez at the end I chose the shortcut regarding detecting leaders before shutdown. The reason is complex as the solution that would be required for this is complex too.
So on one part the sequence of events is problematic. First we update the LogManager and then try to propagate the event to the controller. At this point the metadata is stale so I can't use that for reliable information to detect whether partitions have leadership or not. A workaround would be to subtract the LogManager's data from metadata cache (ie. if there is only a single isr replica and that is the current, then we can accept it as offline in reality). I don't really feel that it is a robust solution, it could be prone to race conditions on the network depending on how requests come from the controller as long as it's alive. I think it's more robust to just fail if we can't contact the controller.
The second reason is a bit technical and can be worked around, although requires lots of effort. When trying to extract which replica->logdir information from LogManager, my only available information regarding logdirs given by the event is the Uuid. Unfortunately LogManager doesn't store the Uuid of an offline dir (and besides I don't think Uuid and logdir names used consistently across the whole module). This problem can be solved by propagating both logdir and Uuid in the events or store offline dirs' Uuid in LogManager. I think the latter is problematic because we can't know the point until we should store information about offline dirs as they might never come back. The first can be done, although could be a sizeable refactor and generally I felt that just choosing the simpler route now could be more robust.
Let me know if you think we should try it.

Copy link
Member

@soarez soarez left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viktorsomogyi I think you're right. I'd also add a third reason to keep this simple, which is that this is a mitigation for a problem which at this point is only theoretical.

If proved necessary in the future we can go down the more complex route. Regarding the first problem you describe, the broker knows – regardless of stale metadata – which partitions it is effectively leading. And in case of a race where a lagging metadata update promotes it to a leader for some partition, that can be dealt with by extending maybeUpdateTopicAssignment to handle assignments into directories that are offline, correcting them to DirectoryId.LOST.

I think this looks good I just have a tiny comment.

core/src/main/scala/kafka/server/ReplicaManager.scala Outdated Show resolved Hide resolved
@viktorsomogyi
Copy link
Contributor Author

@soarez thanks for the info. I addressed your comment. Do you have anything more to add or are we good to go?

Copy link
Member

@soarez soarez left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pinging me @viktorsomogyi.

All the failed tests in CI pass locally.

LGTM

@soarez soarez merged commit 5a48984 into apache:trunk May 23, 2024
1 check failed
apourchet added a commit to apourchet/kafka that referenced this pull request May 23, 2024
commit 93238ae
Author: Antoine Pourchet <antoine@responsive.dev>
Date:   Thu May 23 13:45:29 2024 -0600

    KAFKA-15045: (KIP-924 pt. 10) Topic partition rack annotation simplified  (apache#16034)

    This PR uses the new TaskTopicPartition structure to simplify the build
    process for the ApplicationState, which is the input to the new
    TaskAssignor#assign call.

    Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>

commit 4020307
Author: Kuan-Po (Cooper) Tseng <brandboat@gmail.com>
Date:   Fri May 24 02:51:26 2024 +0800

    KAFKA-16795 Fix broken compatibility in kafka.tools.NoOpMessageFormatter, kafka.tools.DefaultMessageFormatter, and kafka.tools.LoggingMessageFormatter (apache#16020)

    This commit allows users to apply the scala version Formatters, but users will receive the warning messages about deprecation.

    This compatibility support will be removed from 4.0.0

    Reviewers: Chia-Ping Tsai <chia7712@gmail.com>

commit c3018ef
Author: TingIāu "Ting" Kì <51072200+frankvicky@users.noreply.github.com>
Date:   Fri May 24 01:15:56 2024 +0800

    KAFKA-16804: Replace archivesBaseName with archivesName (apache#16016)

    Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Greg Harris <greg.harris@aiven.io>

commit 0ba15ad
Author: Edoardo Comar <ecomar@uk.ibm.com>
Date:   Thu May 23 17:17:56 2024 +0100

    KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… (apache#15910)

    * KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently interrupt offset translation

    MirrorCheckpointTask reloads the last checkpoint at start,
    OffsetSyncStore stores OffsetSyncs before reading till end.

    If CheckpointTask cannot read checkpoints at startup,
    use previous OffsetSyncStore load logic, with
    warning log message about degraded offset translation.

    Also addresses KAFKA-16622 Mirromaker2 first Checkpoint not emitted until
    consumer group fully catches up once because the OffsetSyncStore store
    is populated before reading to log end.

    Co-Authored-By: Adrian Preston <prestona@uk.ibm.com>
    Reviewers: Greg Harris <greg.harris@aiven.io>

commit 5a48984
Author: Viktor Somogyi-Vass <viktorsomogyi@gmail.com>
Date:   Thu May 23 17:36:39 2024 +0200

    KAFKA-15649: Handle directory failure timeout (apache#15697)

    A broker that is unable to communicate with the controller will shut down
    after the configurable log.dir.failure.timeout.ms.

    The implementation adds a new event to the Kafka EventQueue. This event
    is deferred by the configured timeout and will execute the shutdown
    if the heartbeat communication containing the failed log dir is still
    pending with the controller.

    Reviewers: Igor Soarez <soarez@apple.com>

commit 8d117a1
Author: Mickael Maison <mimaison@users.noreply.github.com>
Date:   Thu May 23 17:03:24 2024 +0200

    KAFKA-16825: Update netty/jetty/jackson/zstd dependencies (apache#16038)

    Reviewers: Luke Chen <showuon@gmail.com>

commit ab0cc72
Author: Mickael Maison <mimaison@users.noreply.github.com>
Date:   Thu May 23 16:01:45 2024 +0200

    MINOR: Move parseCsvList to server-common (apache#16029)

    Reviewers: Chia-Ping Tsai <chia7712@gmail.com>

commit 14b5c4d
Author: Dongnuo Lyu <139248811+dongnuo123@users.noreply.github.com>
Date:   Thu May 23 02:27:00 2024 -0400

    KAFKA-16793; Heartbeat API for upgrading ConsumerGroup (apache#15988)

    This patch implements the heartbeat api to the members that use the classic protocol in a ConsumerGroup.

    Reviewers: Jeff Kim <jeff.kim@confluent.io>, David Jacot <djacot@confluent.io>

commit e692fee
Author: Jeff Kim <kimkb2011@gmail.com>
Date:   Thu May 23 02:24:23 2024 -0400

    MINOR: fix flaky testRecordThreadIdleRatio (apache#15987)

    DelayEventAccumulator should return immediately if there are no events in the queue. Also removed some unused fields inside EventProcessorThread.

    Reviewers: Gaurav Narula <gaurav_narula2@apple.com>, Chia-Ping Tsai <chia7712@gmail.com>, David Jacot <djacot@confluent.io>

commit bef83ce
Author: Nick Telford <nick.telford@gmail.com>
Date:   Thu May 23 05:34:31 2024 +0100

    KAFKA-15541: Add iterator-duration metrics (apache#16028)

    Part of [KIP-989](https://cwiki.apache.org/confluence/x/9KCzDw).

    This new `StateStore` metric tracks the average and maximum amount of
    time between creating and closing Iterators.

    Iterators with very high durations can indicate to users performance
    problems that should be addressed.

    If a store reports no data for these metrics, despite the user opening
    Iterators on the store, it suggests those iterators are not being
    closed, and have therefore leaked.

    Reviewers: Matthias J. Sax <matthias@confluent.io>
chiacyu pushed a commit to chiacyu/kafka that referenced this pull request Jun 1, 2024
A broker that is unable to communicate with the controller will shut down
after the configurable log.dir.failure.timeout.ms.

The implementation adds a new event to the Kafka EventQueue. This event
is deferred by the configured timeout and will execute the shutdown
if the heartbeat communication containing the failed log dir is still
pending with the controller.

Reviewers: Igor Soarez <soarez@apple.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants