Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-10199: Separate restore threads #8988

Closed

Conversation

guozhangwang
Copy link
Contributor

@guozhangwang guozhangwang commented Jul 6, 2020

Main Ideas:

  1. Add a separate StateRestoreThread which is responsible for restore active / update standby tasks. Having N restore threads with N stream threads, with a 1-1 mapping just because it makes the synchronization simpler and we can keep the number of restore consumer the same.

  2. Extract the StoreChangelogReader from all other classes (TaskManager, StreamThread) to be solely owned by this thread only, and hence the reader can still be non thread-safe. The communication between the two threads are just two concurrent queues: a) for sending new / closed tasks that should be registered / deregistered at the restore thread, and b) for letting the restore thread to communicate task corruption exception to the main thread to handle it.

  3. There are a bunch of tricky things that I'd have to nudge around, which I've also highlighted in the comments below.

  4. Updated unit tests, making some refactoring as well.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@guozhangwang
Copy link
Contributor Author

@ableegoldman @cadonna for taking a look at the synchronization mechanism.

Copy link
Contributor

@ableegoldman ableegoldman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @guozhangwang

I know this is marked as a WIP so maybe I'm getting ahead of things here, but I feel like this PR is missing a major piece of the restore-in-a-separate-thread picture: letting the StreamThread do useful work while restoration occurs.

Today all active and standby processing is blocked as long as there is a single task still restoring. In my understanding, the primary benefit of moving restoration to a separate thread was to unblock the StreamThread. Otherwise we're just doing the same thing but with more overhead 🙂

Is this planned for a follow-up PR or were you just waiting to get a review of the basics before diving deeper into this work?

@@ -411,7 +411,10 @@ boolean tryToCompleteRestoration() {
} else {
// we found a restoring task that isn't done restoring, which is evidence that
// not all tasks are running
log.debug("Task {} has not completed restoration, will check next time", task.id());

allRunning = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we want to modify the TaskManager so the StreamThread doesn't have to wait for all tasks to finish restoring? It should be able to start processing any active tasks as soon as they finish restoring in the other thread.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My plan is to modify the condition inside StreamThread#runOnce so that we can still process even if state != RUNNING (i.e. all tasks completed restoration) but as long as taskManager#hasRunnableActiveTask. Does that make sense?


if (!activeRestoringChangelogs().isEmpty() && state == ChangelogReaderState.STANDBY_UPDATING) {
if (!activeRestoringChangelogs().isEmpty() && currentState == ChangelogReaderState.STANDBY_UPDATING) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not let the StreamThread process some standbys while the restore thread does its restoring? Not sure if we plan to ultimately move standbys to a new thread, or to share with the restore thread, but it seems like we shouldn't block them on restoration or we're missing out on a huge piece of the available improvement.

Especially with KIP-441 where for most tasks, the majority of restoration will actually occur as a standby and not with actual restoration.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually in this PR I'm indeed moving both standby updating and active restoring to the new thread, so that active thread is ONLY responsible for processing active tasks. Do you find it is not the intended change in this PR?

Comment on lines -430 to -433
// for restoring active and updating standby we may prefer different poll time
// in order to make sure we call the main consumer#poll in time.
// TODO: once we move ChangelogReader to a separate thread this may no longer be a concern
polledRecords = restoreConsumer.poll(state == ChangelogReaderState.STANDBY_UPDATING ? Duration.ZERO : pollTime);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did we remove this logic? AFAICT standbys are still processed in the main StreamThread and the whole point of this was to make sure we don't block active processing on polling standby tasks.

But I wasn't paying close attention to this particular issue/PR so I may have misremembered or misunderstood

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are actually moving the processing of the standbys to the other threads as well. So we do not need different poll times.


// remove corrupted partitions form the changelog reader and continue; we can still proceed
// and restore other partitions until the main thread come to handle this exception
changelogReader.unregister(e.corruptedTaskWithChangelogs().values().stream()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this is strictly incorrect since unregistering is idempotent, but it certainly seems unnecessary. Why do we need to unregister the changelogs here? We do so in ProcessorStateManager's close which should be called in handleCorruption from the main thread.
I'm also generally against unregistering something so far away from where it gets registered 🙂

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's a fair point --- all I care here is letting the changelogReader continue without needing to fetch on this partition any more, since the handling of the exception is on the main thread and hence async to this thread, we do not want to keep getting the same exception again and again.

Let me think how I can refactor this piece.

advanceNowAndComputeLatency();
// check if restore thread has encountered TaskCorrupted exception; if yes
// rethrow it to trigger the handling logic
final TaskCorruptedException e = restoreThread.nextCorruptedException();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be nice to gather and handle all TaskCorruptedExceptions at once rather than one per loop like this, especially since each one likely involves committing all tasks (and I would imagine that with EOS, when we get one TaskCorrupted we are likely to also have more). That can definitely be follow-on work, just putting it out there

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense, will try to address this in this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ableegoldman I've not addressed this comment but it's on my next PR to do it, this one has become too large and hence I'm only keeping necessary changes to get correctness.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. Just to throw one idea out that I think would be pretty small LOE: Instead of queueing up new TaskCorruptedExceptions, we could just store a single TaskCorruptedException and update it's taskWithChangelogs map as new corrupted tasks are detected. Since TaskCorruptedException is already multi-task, and TaskManager#handleCorruption takes a map of tasks as input, it seems natural to just expand the exception with new tasks instead of building up multiple multi-task exceptions.

I think it would also be good to have a mutable task/changelog map so we can make sure any revoked tasks/changelogs are removed from the TaskCorruptedException. Currently, it seems like the StreamThread could actually pull and rethrow a TaskCorruptedException for a task that no longer exists on the instance

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updating the fields of TaskCorruptedException could be risky since it can be read by the other main thread concurrently. I think a better way would be still keeping its field as immutable, but drain all the exceptions (which is thread-safe) and then create a new one aggregating its tasks.

@@ -310,18 +313,14 @@ public void enforceRestoreActive() {
// be cleared when the corresponding task is being removed from the thread. In other words, the restore consumer
// should contain all changelogs that are RESTORING or COMPLETED
@Override
public void transitToUpdateStandby() {
public synchronized void transitToUpdateStandby() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also wondering if it even makes sense to do just the restore in a separate thread and not also move the standbys, assuming we do indeed want to move standbys to a separate thread eventually. It just seems like we might need to do a lot of work to get things consistent in the interim that would all be thrown out when we move on to standbys.

Of course, if we don't plan to move standbys to a separate thread anytime soon then there's no problem. Just wondering if that's part of the near-future roadmap (since standby improvements do indirectly translate to restoration improvements)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ultimate plan, which is also the goal of this PR, is to move both processing of standbys / restoring of actives, which are both captured in the ChangelogReader, to the same separate thread out of the main thread. Do you see a concern with this plan?

@guozhangwang
Copy link
Contributor Author

Is this planned for a follow-up PR or were you just waiting to get a review of the basics before diving deeper into this work?

Yes that's the plan. More concreted I'm going to change this condition:

if (state == State.RUNNING) { // process

In StreamThread#runOnce to

if (taskManager#hasRunnableActiveTask)

I think this is what you're asking for, right?

@guozhangwang guozhangwang changed the title KAFKA-10199: Separate restore threads [WIP] KAFKA-10199: Separate restore threads Sep 15, 2020
Copy link
Contributor Author

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ableegoldman @vvcephei @cadonna I've rebased the PR again, so far it has become a bit larger than expected so I'd only keeping necessary changes for correctness. There are a bunch optimizations that I'm doing in the next PR.

The main idea is as @cadonna suggested, to use a concurrent queue as communication methods between the two threads and keep changelog reader as non thread-safe still.

@@ -34,7 +34,7 @@

final KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> future;

ListConsumerGroupOffsetsResult(KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> future) {
public ListConsumerGroupOffsetsResult(KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> future) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to make public for unit testing.

@@ -146,6 +146,7 @@ public synchronized void assign(Collection<TopicPartition> partitions) {
ensureNotClosed();
committed.clear();
this.subscriptions.assignFromUser(new HashSet<>(partitions));
this.paused.retainAll(partitions);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Piggy-backed fix, when unassigned those partitions, they should also be removed from paused list.

@@ -115,13 +114,16 @@ public boolean isClosed() {
@Override
public void revive() {
if (state == CLOSED) {
// clear all the stores since they should be re-registered
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's one of the tricky things that we need to pay attention to, since we need the task state-manager's changelog partitions for de-registration asynchronously at the restore-thread. And there are a few different scenarios:

  1. completely closing the task: we remove the task from the task-manager, but materialized the state-manager's changelog partitions before clearing the stores since they need to be enqueued to the restore-thread.

  2. reviving a corrupted task: we close the task but do NOT remove it from the task-manager, and we materialize the changelog partitions before clearing the stores.

  3. recycling a task: we do NOT close the task but we do remove it from the task-manager, and hence we do not clear the state-manager's stores as well since we want to skip re-registering them. BUT we still need to materialize the changelog partitions to be enqueued to the restore-thread.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you elaborate on what you mean by "materialize" the changelogs here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ProcessorStateManager#changelogPartitions relies on its changelogOffsets which relies on the stores map. If stores map gets cleared, then changelogPartitions would return nothing. So in order to get its changelog partitions to send to the restore thread, we need to get them first before clearing the stores map, i.e. we need to "materialize" that changelogPartitions map first --- maybe we did not use the right term here, sorry.

@@ -185,7 +181,7 @@ StreamTask createActiveTaskFromStandby(final StandbyTask standbyTask,
final LogContext logContext = getLogContext(standbyTask.id);

standbyTask.closeCleanAndRecycleState();
stateManager.transitionTaskType(TaskType.ACTIVE, logContext);
stateManager.prepareNewTaskType(TaskType.ACTIVE, logContext);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is another tricky part: Since the restore thread check's the state-manager's type to decide whether to check for completion, for those recycled tasks we should not change their type immediately before they are asynchronously de-registered (and later re-registered) with the restore thread.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you elaborate here as well? Maybe I'm missing something but it doesn't look like the restore thread ever checks on the task/state-manager type at all. For example waitIfAllChangelogsCompleted just compares the changelog reader's allChangelogs() vs completedChangelogs(). Neither of those touch on the active/standby status of the task. Should they?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The "check" I meant above is in StoreChangelogReader in various places, which is called by the restore thread. When an active task recycles to a standby task, the following can happen:

  1. task is closed.
  2. task type is changed to standby.
  3. task transits to CREATED state.

Previously the changelogs are synchronously removed at step 1 and re-added in step 3. But now they are removed / re-added asynchronously, say as step 4/5. So if we still change its type in step 2, then it is possible that the changelog reader from the other thread tries to access its type in between and hence gets into error state. Therefore I have to defer step 2) in between 4) and 5), and executed by the restore thread rather than the main thread.

/**
* Transit to restore active changelogs mode
*/
void enforceRestoreActive();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the restore changelog reader solely owned by a single thread, we do not need make this public APIs any more.

}

private Map<TopicPartition, Long> committedOffsetForChangelogs(final Set<TopicPartition> partitions) {
final Map<TopicPartition, Long> committedOffsets;
try {
committedOffsets = fetchCommittedOffsets(partitions, mainConsumer);
committedOffsets = fetchCommittedOffsets(partitions, groupId, adminClient);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now with the changelog reader accessed by a separate thread, we cannot let it to access the main consumer anymore since the consumer is not thread-safe and we do not allow concurrent access.

So here I'm switching to use the admin client to get committed offsets.

advanceNowAndComputeLatency();
// check if restore thread has encountered TaskCorrupted exception; if yes
// rethrow it to trigger the handling logic
final TaskCorruptedException e = restoreThread.nextCorruptedException();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ableegoldman I've not addressed this comment but it's on my next PR to do it, this one has become too large and hence I'm only keeping necessary changes to get correctness.

import java.util.Map;
import java.util.Set;

public class StateMachineTask extends AbstractTask implements Task {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extracted this mock class since it is now needed in multiple unit tests.

Copy link
Contributor

@ableegoldman ableegoldman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for leaving so many comments 😬 Just trying to make sure this is all understandable since it's a big change

@@ -65,10 +65,6 @@ public ProcessorContextImpl(final TaskId id,

@Override
public void transitionToActive(final StreamTask streamTask, final RecordCollector recordCollector, final ThreadCache newCache) {
if (stateManager.taskType() != TaskType.ACTIVE) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is because we don't transition the state manager's type until later, maybe we should assert that the stateManager.taskType == STANDBY instead of removing the check altogether (and vice versa down below)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I realized that we call this function sometimes (e.g. upon initialization) even when the task is already active, so we cannot just reject and fail if it was not in standby

item.task.changelogPartitions(),
item.task.id());
} else if (item.type == ItemType.CREATE) {
// we should only convert the state manager type right StateRestoreThreadTest.javabefore re-registering the changelog
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like something extra slipped into this comment

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah thanks!

log.debug("Preparing to transit state manager for task {} from {} to {}", taskId, taskType, newType);
}

void maybeConvertToNewTaskType() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: call it maybeCompleteTaskTypeConversion or completeTaskTypeConversionIfNecessary? Right now it kind of sounds like we're just randomly converting the task to a new type but really we're just finishing that conversion off

@@ -220,11 +224,6 @@ public void closeDirty() {
@Override
public void closeCleanAndRecycleState() {
streamsMetrics.removeAllTaskLevelSensors(Thread.currentThread().getName(), id.toString());
if (state() == State.SUSPENDED) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's still true that a task should always be suspended before recycling, let's keep the check here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense.

@@ -200,20 +200,23 @@ private void closeAndRevive(final Map<Task, Collection<TopicPartition>> taskWith
}
task.closeDirty();

removedTasks.put((AbstractTask) task, task.changelogPartitions());
((AbstractTask) task).stateMgr.clear();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems not good...we shouldn't be casting to an abstract class. Let's just add whatever we need as a method on the Task interface

Comment on lines 58 to 63
* ChangelogReader is created and shared by the stream thread and restore thread. It is used for both updating standby tasks and
* restoring active tasks. It manages the restore consumer, including its assigned partitions, when to pause / resume
* these partitions, etc.
* <p>
* This object is thread-safe for concurrent access between the two threads.
* <p>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These comments need to be updated, no longer shared or thread-safe right?

@@ -107,6 +107,7 @@

private final ProcessorStateManager stateManager;

// NOTE only this field may be concurrently accessed by stream and restore threads
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also out of date?

// source of the truth of the current registered changelogs;
// NOTE a changelog would only be removed when its corresponding task
// is being removed from the thread; otherwise it would stay in this map even after completed
//
// this map may be concurrently accessed by different threads and hence need to be guarded
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll stop pointing them all out and let you give this class a pass for any out of date comments 🙂

final TaskId taskId = changelogs.get(partition).stateManager.taskId();
taskWithCorruptedChangelogs.computeIfAbsent(taskId, k -> new HashSet<>()).add(partition);
final ChangelogMetadata metadata = changelogs.get(partition);
if (metadata != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can metadata be null? Doesn't seem like we should ever get an InvalidOffsetException for changelogs we don't have. At the very least we should log a warning, if not throw IllegalStateException

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense.

guozhangwang and others added 3 commits October 5, 2020 20:45
…nals/StateRestoreThread.java

Co-authored-by: A. Sophie Blee-Goldman <ableegoldman@gmail.com>
…nals/StreamThread.java

Co-authored-by: A. Sophie Blee-Goldman <ableegoldman@gmail.com>
…nals/StreamThread.java

Co-authored-by: A. Sophie Blee-Goldman <ableegoldman@gmail.com>
guozhangwang and others added 3 commits October 5, 2020 22:02
…nals/StreamThread.java

Co-authored-by: A. Sophie Blee-Goldman <ableegoldman@gmail.com>
…nals/StreamThread.java

Co-authored-by: A. Sophie Blee-Goldman <ableegoldman@gmail.com>
…nals/StreamThread.java

Co-authored-by: A. Sophie Blee-Goldman <ableegoldman@gmail.com>
Copy link
Contributor Author

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replied your comments.

@@ -220,11 +224,6 @@ public void closeDirty() {
@Override
public void closeCleanAndRecycleState() {
streamsMetrics.removeAllTaskLevelSensors(Thread.currentThread().getName(), id.toString());
if (state() == State.SUSPENDED) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense.


private static class TaskItem {
private final ItemType type;
private final AbstractTask task;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah AbstractTask#stateManager is the reason I went with AbstractTask. I can add a new method to Task.

final List<TaskItem> items = new ArrayList<>();
taskItemQueue.drainTo(items);

if (!items.isEmpty()) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack


if (!items.isEmpty()) {
for (final TaskItem item : items) {
// TODO: we should consider also call the listener if the
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I meant this exactly (you brought this up when I was in the middle of it, so I added this comment to remind anyone in the future). Created a ticket for this and updated the comment: https://issues.apache.org/jira/browse/KAFKA-10575

item.task.changelogPartitions(),
item.task.id());
}
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I forgot to remove the type REVIVE actually... the revived task will be treated as a CLOSE followed by a CREATE, and since we always try to drain CLOSE'ed tasks and then CREATED tasks, it means revived tasks would always be processed as CLOSE and then as CREATE in order.

@@ -185,7 +181,7 @@ StreamTask createActiveTaskFromStandby(final StandbyTask standbyTask,
final LogContext logContext = getLogContext(standbyTask.id);

standbyTask.closeCleanAndRecycleState();
stateManager.transitionTaskType(TaskType.ACTIVE, logContext);
stateManager.prepareNewTaskType(TaskType.ACTIVE, logContext);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The "check" I meant above is in StoreChangelogReader in various places, which is called by the restore thread. When an active task recycles to a standby task, the following can happen:

  1. task is closed.
  2. task type is changed to standby.
  3. task transits to CREATED state.

Previously the changelogs are synchronously removed at step 1 and re-added in step 3. But now they are removed / re-added asynchronously, say as step 4/5. So if we still change its type in step 2, then it is possible that the changelog reader from the other thread tries to access its type in between and hence gets into error state. Therefore I have to defer step 2) in between 4) and 5), and executed by the restore thread rather than the main thread.


// try complete restoration if there are any restoring tasks
if (taskManager.tryToCompleteRestoration(restoreThread.completedChangelogs())) {
log.debug("Completed restoring all tasks now");
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. I actually let it to return boolean for testing purposes, and I agree that in non-testing code we actually do not care much about the returned boolean. I will update accordingly.

Comment on lines 654 to 655
// TODO: we should allow active tasks processing even if we are not yet in RUNNING
// after restoration is moved to the other thread
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do have a JIRA ticket for this, and I do plan to do it right after this PR (since this is really a pre-requisite to have that). I will link the ticket here.

log.debug("Initializing newly created tasks {} under state {}",
initializedTasks.stream().map(AbstractTask::id).collect(Collectors.toList()), state);
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack.

final TaskId taskId = changelogs.get(partition).stateManager.taskId();
taskWithCorruptedChangelogs.computeIfAbsent(taskId, k -> new HashSet<>()).add(partition);
final ChangelogMetadata metadata = changelogs.get(partition);
if (metadata != null) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense.

@guozhangwang
Copy link
Contributor Author

@guozhangwang
Copy link
Contributor Author

test this

Copy link
Contributor

@cadonna cadonna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @guozhangwang, Thank you for the PR and sorry for my late review. I did a pass over, except the tests and left some comments.

Comment on lines +616 to +629
// we need to first add any closed revoked/corrupted/recycled tasks and then add the initialized tasks to update the changelogs of revived/recycled tasks
restoreThread.addClosedTasks(taskManager.drainRemovedTasks());

// try to initialize created tasks that are either newly assigned, recycled, or revived from corrupted tasks
final List<Task> initializedTasks = taskManager.tryInitializeNewTasks();
if (!initializedTasks.isEmpty()) {
log.info("Initialized new tasks {} under state {}, will start restoring them",
initializedTasks.stream().map(Task::id).collect(Collectors.toList()), state);

restoreThread.addInitializedTasks(initializedTasks);
}

// try complete restoration if there are any restoring tasks
taskManager.tryToCompleteRestoration(restoreThread.completedChangelogs());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it possible to move this code into the task manager and have just one call to the task manager here that returns if restoration is done?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I intentionally do not make the restore thread as part of the task manager since moving forward when we have one consumer per client, instead per stream thread, I plan to only have a single restore thread instead of N threads, one per stream-thread, and hence I would like to "isolate" the thread from a specific task manager starting early. Ditto for the other comment.

Comment on lines +640 to +643
final RuntimeException e = restoreThread.pollNextExceptionIfAny();
if (e != null) {
throw e;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that could also be done in the task manager. What I mean is to call a method on the task manager that then throws the exception.

@@ -946,14 +934,14 @@ private void completeShutdown(final boolean cleanRun) {
log.info("Shutting down");

try {
taskManager.shutdown(cleanRun);
restoreThread.shutdown(10_000L);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also think that it makes sense to pass in the timeout to shutdown(). At least for the code path originating from KafkaStreams#close(), we can apply the timeout passed to close().

Comment on lines 170 to 171
// a task being recycled maybe in both closed and initialized tasks,
// and hence we should process the closed ones first and then initialized ones
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you want to say with this comment?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the comment a bit, hopefully it would be more clear now.

Comment on lines 199 to 227
// try to restore some changelogs
final long startMs = time.milliseconds();
try {
final int numRestored = changelogReader.restore();
// TODO KIP-444: we should record the restoration related metrics including restore-ratio
log.debug("Restored {} records in {} ms", numRestored, time.milliseconds() - startMs);
} catch (final TaskCorruptedException e) {
log.warn("Detected the states of tasks " + e.corruptedTaskWithChangelogs() + " are corrupted. " +
"Will close the task as dirty and re-create and bootstrap from scratch.", e);

// remove corrupted partitions form the changelog reader and continue; we can still proceed
// and restore other partitions until the main thread come to handle this exception
changelogReader.unregister(e.corruptedTaskWithChangelogs().values().stream()
.flatMap(Collection::stream)
.collect(Collectors.toList()));

corruptedExceptions.add(e);
} catch (final StreamsException e) {
// if we are shutting down, the consumer could throw interrupt exception which can be ignored;
// otherwise, we re-throw
if (!(e.getCause() instanceof InterruptException) || isRunning.get()) {
throw e;
}
} catch (final TimeoutException e) {
log.info("Encountered timeout when restoring states, will retry in the next loop");
}

// finally update completed changelogs
completedChangelogs.set(changelogReader.completedChangelogs());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would put extract this code into a method named tryRestoreChangelogs().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack.

Comment on lines 172 to 197
final List<TaskItem> items = new ArrayList<>();
taskItemQueue.drainTo(items);

for (final TaskItem item : items) {
// TODO KAFKA-10575: we should consider also call the listener if the
// task is closed but not yet completed restoration
if (item.type == ItemType.CLOSE) {
changelogReader.unregister(item.changelogPartitions);

log.info("Unregistered changelogs {} for closing task {}",
item.task.changelogPartitions(),
item.task.id());
} else if (item.type == ItemType.CREATE) {
// we should only convert the state manager type right before re-registering the changelog
item.task.stateManager().maybeCompleteTaskTypeConversion();

for (final TopicPartition partition : item.changelogPartitions) {
changelogReader.register(partition, item.task.stateManager());
}

log.info("Registered changelogs {} for created task {}",
item.task.changelogPartitions(),
item.task.id());
}
}
items.clear();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would extract this code into a method with a meaningful name.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack.

@@ -55,16 +55,16 @@
import static org.apache.kafka.streams.processor.internals.ClientUtils.fetchCommittedOffsets;

/**
* ChangelogReader is created and maintained by the stream thread and used for both updating standby tasks and
* ChangelogReader is created and shared by the stream thread and restore thread. It is used for both updating standby tasks and
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far I can see, the store changelog manager is not shared but exclusively maintained by the restore thread.

@@ -93,16 +95,14 @@
private final Set<TaskId> lockedTaskDirectories = new HashSet<>();
private java.util.function.Consumer<Set<TopicPartition>> resetter;

TaskManager(final ChangelogReader changelogReader,
final UUID processId,
TaskManager(final UUID processId,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think of passing the restore thread to the TaskManager and start it in its constructor? I would even propose to create and start the restore thread in the TaskManager's constructor. In such a way, the specifics of restoration is hidden in the TaskManager. WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my other comment :)

@guozhangwang
Copy link
Contributor Author

System tests succeeded: https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4232/

Will deploy soak for a few days.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants