Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-14307; Controller time-based snapshots #12761

Merged
merged 14 commits into from Nov 22, 2022

Conversation

jsancio
Copy link
Member

@jsancio jsancio commented Oct 17, 2022

Implement time based snapshot for the controller. The general strategy for this feature is that the controller will use the record-batch's append time to determine if a snapshot should be generated. If the oldest record that has been committed but is not included in the latest snapshot is older than metadata.log.max.snapshot.interval.ms, the controller will trigger a snapshot immediately. This is useful in case the controller was offline for more that metadata.log.max.snapshot.interval.ms milliseconds.

If the oldest record that has been committed but is not included in the latest snapshot is NOT older than metadata.log.max.snapshot.interval.ms, the controller will schedule a maybeGenerateSnapshot deferred task.

It is possible that when the controller wants to generate a new snapshot, either because of time or number of bytes, the controller is currently generating a snapshot. In this case the SnapshotGeneratorManager was changed so that it checks and potentially triggers another snapshot when the currently in-progress snapshot finishes.

To better support this feature the following additional changes were made:

  1. The configuration metadata.log.max.snapshot.interval.ms was added to KafkaConfig with a default value of one hour.
  2. RaftClient was extended to return the latest snapshot id. This snapshot id is used to determine if a given record is included in a snapshot.
  3. Improve the SnapshotReason type to support the inclusion of values in the message.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@jsancio jsancio force-pushed the kafka-14307-controller-time-snapshot branch from c182211 to 74607ca Compare October 18, 2022 18:29
@jsancio jsancio force-pushed the kafka-14307-controller-time-snapshot branch from 0c22f88 to 53c854f Compare October 18, 2022 19:07
@jsancio jsancio marked this pull request as ready for review October 19, 2022 00:37
@dengziming dengziming self-assigned this Nov 4, 2022
core/src/main/scala/kafka/server/KafkaConfig.scala Outdated Show resolved Hide resolved
core/src/main/scala/kafka/server/KafkaConfig.scala Outdated Show resolved Hide resolved
queue.scheduleDeferred(event.name,
new EarliestDeadlineFunction(time.nanoseconds() + delayNs), event);
}

void handleSnapshotFinished(Optional<Exception> exception) {
if (exception.isPresent()) {
log.error("Error while generating snapshot {}", generator.lastContainedLogOffset(), exception.get());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is a snapshot failure, will we wait for the next interval before retrying? I wonder if we need a metric or something to get marked when this happens?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is a snapshot failure, will we wait for the next interval before retrying?
Yeah. The snapshot counters committedBytesSinceLastSnapshot and oldestCommittedLogOnlyAppendTimestamp are updated assuming that snapshot generation cannot fail.

We could add another reason ("snapshot failure") that triggers a snapshot immediately irrespective of the counters. I am concerned that this event/task my starve other controller events. Maybe it is better to just rely on timed snapshot and NoOpRecord to trigger another snapshot in the case of failures.

If there is a snapshot failure, will we wait for the next interval before retrying?

Let me check the KIPs but I don't think we define this metric. I can add it and update one of the KIPs if we agree to it here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about this for the controller:
kafka.controller:type=KafkaController,name=MetadataSnapshotGenerationErrors
Incremented anytime the controller fails to generate a snapshot. Reset to zero anytime the controller restarts or a snapshot is successfully generated.

And this for the brokers:
kafka.server:type=broker-metadata-metrics,name=snapshot-generation-errors
Incremented anytime the broker fails to generate a snapshot. Reset to zero anytime the broker restarts or a snapshot is successfully generated.

@hachikuji do you mind if I implement this in a future PR? I created this issue: https://issues.apache.org/jira/browse/KAFKA-14403

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doing it separately sounds fine.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kafka.controller:type=KafkaController,name=MetadataSnapshotGenerationErrors

I am concerned that with a counter that resets at restart, we might have an ever so small likelihood of never incrementing the metric even with the log moving and no snapshots generated e.g. if we keep crashing while generating a snapshot. Ditto for the broker.

What do you think about having a kafka.controller:type=KafkaController,name=LastSnapshotTime metric.
This metric would just emit the timestamp (linux epoch) of when the last snapshot was successfully written.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay. Let's move this discussion to the DISCUSSION thread in Kafka dev. I'll send a message next week.

@@ -986,6 +1012,13 @@ public void handleCommit(BatchReader<ApiMessageAndVersion> reader) {
batch.appendTimestamp(),
committedBytesSinceLastSnapshot + batch.sizeInBytes()
);

if (offset >= raftClient.latestSnapshotId().map(OffsetAndEpoch::offset).orElse(0L)) {
oldestCommittedLogOnlyAppendTimestamp = Math.min(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why "LogOnlyAppendTimestamp" instead of "LogAppendTimestamp"?

Copy link
Member Author

@jsancio jsancio Nov 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is the oldest timestamp in the log that is not included in a snapshot. I was thinking that oldestCommittedLogAppendTimestamp could mislead the reader if they don't read the description of that variable:

      /**
       * Timestamp for the oldest record that was committed but not included in a snapshot.
       */
      private long oldestCommittedLogOnlyAppendTimestamp = Long.MAX_VALUE;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Maybe overkill, but how about something even more verbose like oldestNonSnapshottedLogAppendTimestamp.
The current name still had me read the description.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking of a similar name when I was implementing this feature but I felt it was too verbose. I don't have a strong opinion here so I can change it to whatever the reader prefers.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do think oldestNonSnapshottedLogAppendTimestamp is a little clearer. Perhaps the fact that it is an append timestamp is already clear from context and we could use oldestNonSnapshottedTimestamp? I also don't feel strongly though.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I picked oldestNonSnapshottedTimestamp. Updated the PR.l

Copy link
Contributor

@niket-goel niket-goel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the changes @jsancio ! Left a few minor comments.

raft/src/main/java/org/apache/kafka/raft/RaftClient.java Outdated Show resolved Hide resolved
core/src/main/scala/kafka/server/ControllerServer.scala Outdated Show resolved Hide resolved
core/src/main/scala/kafka/server/KafkaConfig.scala Outdated Show resolved Hide resolved
core/src/main/scala/kafka/server/KafkaConfig.scala Outdated Show resolved Hide resolved
core/src/main/scala/kafka/server/KafkaConfig.scala Outdated Show resolved Hide resolved

// Delete every in-memory snapshot up to the committed offset. They are not needed since this
// snapshot generation finished.
snapshotRegistry.deleteSnapshotsUpTo(lastCommittedOffset);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: Is this OK to do even if the snapshot never succeeded? I think the answer would be yes, but just confirming.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. The controller needs to keep in-memory snapshot older than the last committed offset because a snapshot may be iterating over those timelined values. Once we know that there are no pending snapshots (either because it succeeded or it failed) the controller can delete in-memory snapshots up to the committed offset.

@@ -986,6 +1012,13 @@ public void handleCommit(BatchReader<ApiMessageAndVersion> reader) {
batch.appendTimestamp(),
committedBytesSinceLastSnapshot + batch.sizeInBytes()
);

if (offset >= raftClient.latestSnapshotId().map(OffsetAndEpoch::offset).orElse(0L)) {
oldestCommittedLogOnlyAppendTimestamp = Math.min(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Maybe overkill, but how about something even more verbose like oldestNonSnapshottedLogAppendTimestamp.
The current name still had me read the description.

committedBytesSinceLastSnapshot = 0;
if (!snapshotReasons.isEmpty()) {
if (!isActiveController()) {
// The active controller creates in-memory snapshot every time an uncommitted
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit:
Would be a little easier to read if the comment was rephrased to put what is happening here:

// The standby controllers do not create in-memory snapshots every time a 
// batch gets appended, so we create it now.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reorder the sentences and improved the wording. I kept both sentences because I think they are important for anyone interested in understanding when the controllers generates and deletes in-memory snapshots.

val MetadataSnapshotMaxIntervalMsDoc = "This is the maximum number of milliseconds to wait to generate a snapshot " +
"if there are committed records in the log that are not included in the latest snapshot. A value of zero disables " +
s"time based snapshot generation. The default value is ${Defaults.MetadataSnapshotMaxIntervalMs}. To geneate " +
s"snapshots based on the number of metadata bytes, see the <code>$MetadataSnapshotMaxNewRecordBytesProp</code> " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of just referring to the other configuration, I was thinking we could mention that snapshots will be taken when either the interval is reached or the max bytes limit is reached.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Added a sentence to both descriptions explaining this.

core/src/main/scala/kafka/server/KafkaConfig.scala Outdated Show resolved Hide resolved
@@ -986,6 +1012,13 @@ public void handleCommit(BatchReader<ApiMessageAndVersion> reader) {
batch.appendTimestamp(),
committedBytesSinceLastSnapshot + batch.sizeInBytes()
);

if (offset >= raftClient.latestSnapshotId().map(OffsetAndEpoch::offset).orElse(0L)) {
oldestCommittedLogOnlyAppendTimestamp = Math.min(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do think oldestNonSnapshottedLogAppendTimestamp is a little clearer. Perhaps the fact that it is an append timestamp is already clear from context and we could use oldestNonSnapshottedTimestamp? I also don't feel strongly though.

cancelNextGenerateSnapshot();
}
} else {
/* Skip snapshot generation if there is a snaphshot in progress.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think this would be a little clearer if the if check is inverted:

if (snapshotGeneratorManager.snapshotInProgress()) {
  /* Skip snapshot generation if there is a snaphshot in progress.
...
} else {

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


// The snapshot counters for size-based and time-based snapshots could have changed to cause a new
// snapshot to get generated.
maybeGenerateSnapshot();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case there was a failure, does it make sense to back off before retrying?

Copy link
Member Author

@jsancio jsancio Nov 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The quorum controller resets the size-based (committedBytesSinceLastSnapshot) and time-based (oldestNonSnapshottedTimestamp) variables when it starts a snapshot. If the snapshot fails these variables were still reset when snapshot generation started.

This acts as a throttle. Snapshot generation will be trigger as often as what is described in metadata.log.max.record.bytes.between.snapshots and metadata.log.max.snapshot.interval.ms even if the snapshot happens to fail.

Copy link
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, LGTM

@jsancio jsancio merged commit 72b535a into apache:trunk Nov 22, 2022
@jsancio jsancio deleted the kafka-14307-controller-time-snapshot branch November 22, 2022 01:30
guozhangwang pushed a commit to guozhangwang/kafka that referenced this pull request Jan 25, 2023
Implement time based snapshot for the controller. The general strategy for this feature is that the controller will use the record-batch's append time to determine if a snapshot should be generated. If the oldest record that has been committed but is not included in the latest snapshot is older than `metadata.log.max.snapshot.interval.ms`, the controller will trigger a snapshot immediately. This is useful in case the controller was offline for more that `metadata.log.max.snapshot.interval.ms` milliseconds.

If the oldest record that has been committed but is not included in the latest snapshot is NOT older than `metadata.log.max.snapshot.interval.ms`, the controller will schedule a `maybeGenerateSnapshot` deferred task.

It is possible that when the controller wants to generate a new snapshot, either because of time or number of bytes, the controller is currently generating a snapshot. In this case the `SnapshotGeneratorManager` was changed so that it checks and potentially triggers another snapshot when the currently in-progress snapshot finishes.

To better support this feature the following additional changes were made:
1. The configuration `metadata.log.max.snapshot.interval.ms` was added to `KafkaConfig` with a default value of one hour.
2. `RaftClient` was extended to return the latest snapshot id. This snapshot id is used to determine if a given record is included in a snapshot.
3. Improve the `SnapshotReason` type to support the inclusion of values in the message.

Reviewers: Jason Gustafson <jason@confluent.io>, Niket Goel <niket-goel@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants