Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-9501: convert between active and standby without closing stores #8248

Conversation

ableegoldman
Copy link
Contributor

@ableegoldman ableegoldman commented Mar 7, 2020

This PR has gone through several significant transitions of its own, but here's the latest:

  1. TaskManager just collects the tasks to transition and refers to the active/standby task creator to handle closing & recycling the old task and creating the new one. If we ever hit an exception during the close, we bail and close all the remaining tasks as dirty.
  2. The task creators tell the task to "close but recycle state". If this is successful, it tells the recycled processor context and state manager that they should transition to the new type.
  3. During "close and recycle" the task just does a normal clean close, but instead of closing the state manager it informs it to recycle itself: maintain all of its store information (most importantly the current store offsets) but unregister the changelogs from the changelog reader
  4. The new task will (re-)register its changelogs during initialization, but skip re-registering any stores. It will still read the checkpoint file, but only use the written offsets if the store offsets are not already initialized from pre-transition
  5. To ensure we don't end up with manual compaction disabled for standbys, we have to call the state restore listener's onRestoreEnd for any active restoring stores that are switching to standbys

@ableegoldman ableegoldman changed the title KAFKA-9501: convert between active and standby without closing stores [WIP] KAFKA-9501: convert between active and standby without closing stores Mar 9, 2020
@@ -132,47 +130,74 @@ private static String getTaskProducerClientId(final String threadClientId, final
partitions
);

if (threadProducer == null) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Factored out the following to be reused in the case of transitioning from standby; a standby has only the state manager, everything else must be created just like a new task

@ableegoldman ableegoldman force-pushed the KIP-441-fix-standby-to-active-transition branch 2 times, most recently from 9874887 to dddbfa0 Compare March 12, 2020 23:16
@ableegoldman ableegoldman force-pushed the KIP-441-fix-standby-to-active-transition branch 2 times, most recently from 46fba60 to 98a230a Compare April 9, 2020 05:01
@ableegoldman ableegoldman force-pushed the KIP-441-fix-standby-to-active-transition branch 2 times, most recently from 1b0d40b to dbdca84 Compare April 22, 2020 22:14
@ableegoldman ableegoldman changed the title [WIP] KAFKA-9501: convert between active and standby without closing stores KAFKA-9501: convert between active and standby without closing stores Apr 23, 2020
@ableegoldman
Copy link
Contributor Author

Finally ready for review @vvcephei @cadonna @guozhangwang

@ableegoldman ableegoldman force-pushed the KIP-441-fix-standby-to-active-transition branch 2 times, most recently from c0a0f90 to 4202489 Compare April 24, 2020 01:58
@@ -148,6 +161,82 @@ public void shouldCreateStandByTasksForMaterializedAndOptimizedSourceTables() th
);
}

@Test
public void shouldRecycleStateFromStandbyTaskPromotedToActiveTask() throws Exception {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wasn't really sure where to put this test, but this class seemed close enough. Verified that this does indeed fail on trunk without this fix

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems fine to me.

Copy link
Contributor

@vvcephei vvcephei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wow, thanks for this @ableegoldman !

I've left some comments, but I think the high-level approach looks good!

Aside from the code comments, it seems like it would be nice to have a top-level behavioral test verifying that we can actually do what the reporter of KAFKA-9501 was trying to do. Namely, start the app with a standby and in-memory stores, write some data, then fail over to the standby without having to recover the store (i.e., the actual in-memory store instance is reused).

Is that test here, and I just overlooked it?

I do appreciate the more isolated tests. I'm just suggesting to verify the feature end-to-end as well.

Thanks again,
-john

@@ -156,7 +156,7 @@
files="(TopologyBuilder|KafkaStreams|KStreamImpl|KTableImpl|StreamThread|StreamTask).java"/>

<suppress checks="MethodLength"
files="(KTableImpl|StreamsPartitionAssignor.java)"/>
files="(KTableImpl|StreamsPartitionAssignor|TaskManager).java"/>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, it seems like this one at least is avoidable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Man, this is like being the person to finish the tequila bottle and having to eat the worm. Everyone's been adding to TaskManager#handleAssignment a few lines at a time and I was the lucky one to push it over the checkstyle edge 🍀

Copy link
Contributor Author

@ableegoldman ableegoldman May 1, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kidding aside, Guozhang keeps telling me that @mjsax is (or is about to be) working on a PR to clean up the TaskManager. I wanted to keep the conflicts to a minimum, especially if it'll all be rewritten soon anyway. Would you be satisfied if I made a ticket for myself to loop back around later?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given the current progress on KIP-447 testing, I would start working on the refactoring next week. keeping fingers crossed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fine by me!

By the way, I think you mean getting to eat the worm. ;)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can actually remove the suppression for StreamsPartitionAssigor though. So net neutral style violations 👍

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, awesome! Maybe we should try to remove them all at some point, to figure out which ones we really still need.

.collect(Collectors.toSet());
.stream()
.map(taskId -> getTaskProducerClientId(threadId, taskId))
.collect(Collectors.toSet());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like duelling formatters here and above. Do you want to propose that these it's better this way?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really have skin in the game, but I would argue that this is more consistent with the rest of the codebase.

Comment on lines 571 to 575
if (taskType == TaskType.ACTIVE) {
taskType = TaskType.STANDBY;
} else {
taskType = TaskType.ACTIVE;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems a bit "bold" here to assume that we want to flip the active/standby state. The TaskManager objectively knows what type it wants the task to become, so it seems it should just inform us of the desired task type in prepareForRecycle.

Or, better yet, just null it out and we can set it during createStandbyTaskFromActive and createActiveTaskFromStandby. This kind of thing might also be nice, since we could have a more explicit lifecycle for this object:

  • I'm ready (created, valid, good to go)
  • I'm getting recycled
  • I'm closed

Then, we can ensure that these "createXFromY" methods cleanly take the state manager from "closed" to "ready".
I'm not saying to add another state enum, but having a clearly defined lifecycle will help us later in maintenance.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense to me, I'll try it out

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright I reworked things slightly here. With nulling out the task type during close/recycle we can technically get away with not even having the recyclingState flag at all. But I thought it might be a good idea to keep the two distinct to avoid future messes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually turns out nulling it during close causes some tests to fail 😕 Probably the changelog reader. I'll just revert that part for now and look into it next week

@ableegoldman ableegoldman force-pushed the KIP-441-fix-standby-to-active-transition branch from fe9a8ba to 08836c5 Compare May 2, 2020 03:30
Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ableegoldman thanks for the PR. I made a pass over the non-testing part. I had a high-level meta comment trying to avoid the closure / re-creation procedure on the task-manager, lmk wdyt.

} else {
// check for tasks that were owned previously but have changed active/standby status
final boolean isTransitioningType = activeTasks.containsKey(task.id()) || standbyTasks.containsKey(task.id());
if (isTransitioningType) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a meta comment: I'm a bit inclined to suggest that we do this recycle on the task-manager, i.e. do not call task.close() inside task-manager and let the close() internally check a boolean whether it is prepareRecycle.

Instead, we can do the following:

  1. Add a Task#convertTo(TaskType) interface which would return an active / standby task copying the fields of the original task, originated in RESTORING state.

  2. For active task, the implementation would be:

  • first transit to the RESTORING state (we would allow SUSPENDED to transit to RESTORING too, so if it is not in CREATED, we can first suspend it and then transit to RESTORING).

  • and then return a newly created standby task initialized state as RESTORING.

  1. For standby task, the implementation would be:
  • first transit to RESTORING state (which would usually be a no-op).

  • and then return a newly created active task initialized state as RESTORING.

Also I realized that recordCollector.initialize(); in active task should be moved from initializeIfNeeded to completeRestoration. This is a minor bug that may block this proposal --- I will prepare a PR fixing this.

  1. Then on task manager, for those convertible tasks we would call convertTo instead of close / re-create via the task-creators.

The key behind this proposal is that:

  • Suspended and Restoring states are actually the same for active and standby tasks.
  • In the future when we remove Suspended state we would just have Restoring.
  • Active and Standby's Restoring state are actually the same in terms of functionality.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this does sound appealing, and is more or else what I first tried out when I started work on this (a long long time ago). My take at the time was that it added some complexity to the already-complicated TaskManager#handleAssignment while gaining little benefit, and was less "future-proof" compared to the current approach. For example it seems safer to just reuse the bare minimum of the task needed to recycle the state rather than rely on assumptions about which task states might be functionally equivalent to which states of the other type. It also seemed weird to require each task know how and be able to create a task of the opposite type, I felt this was more appropriate for the task creators.

@ableegoldman ableegoldman force-pushed the KIP-441-fix-standby-to-active-transition branch from c53eb4e to b119a8e Compare May 8, 2020 23:25
}

@Test
public void shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore() throws Exception {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright I tried to shore up the test to verify that we actually do not restore anything in addition to not closing the store itself. This should be closer to testing the specific behavior pointed out in the ticket

@vvcephei
Copy link
Contributor

test this please

1 similar comment
@vvcephei
Copy link
Contributor

test this please

@ableegoldman ableegoldman force-pushed the KIP-441-fix-standby-to-active-transition branch 7 times, most recently from 3da04f4 to dd7ce67 Compare May 18, 2020 21:53
* Removes the passed in partitions from the set of changelogs
* @param revokedPartitions the set of partitions to remove
*/
void remove(Collection<TopicPartition> revokedPartitions);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed to unregister and moved to ChangelogRegister interface

final List<TopicPartition> revokedInitializedChangelogs = new ArrayList<>();

for (final TopicPartition partition : revokedChangelogs) {
final ChangelogMetadata changelogMetadata = changelogs.remove(partition);
if (changelogMetadata != null) {
if (changelogMetadata.state() != ChangelogState.REGISTERED) {
if (triggerOnRestoreEnd && changelogMetadata.state().equals(ChangelogState.RESTORING)) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright, the situation here is that we need to make sure we toggle bulk loading off for any active restoring tasks that convert to standby. Rather than try and force a direct call to toggleForBulkLoading on the store itself I figured we should just call onRestoreEnd. Technically, restoration is ending. It just happens to be due to type transition, rather than restore completion.

I figured this might be relevant for users of custom stores, which might do something similar to bulk loading that they wish to turn off for standbys. But since this is only relevant to the underlying store, and doesn't mean we have actually finished restoring a task, we should only call the specific store's listener -- and not the user registered global listener.
WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this makes sense. The restore listener / bulk loading interaction is a bit wacky, but it seems reasonable to just work around it for now.

Just to play devil's advocate briefly, though, is it not true for all listeners that the restore has ended, for exactly the reason you cited above?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume you're referring to the user registered global listener? I was trying to imagine what users actually might be using this for, and figured a major reason was to alert when a store is ready for IQ. Obviously, the store is not in fact ready for IQ in this case. I assume the worst that could happen is they'll just get an exception saying the store is not in fact ready if they do try to query it, but it still seems likely to cause confusion.

If you're also wondering about the seemingly arbitrary distinction made between the store-specific listener and the global one, it seems like the intent of the store-specific listener is to take action on a particular store as it transitions between restoring and not. The store-specific listener has a reference to the actual store, and can for example toggle it for bulk loading.

But IIUC the global listener does not have a reference to any actual stores and thus it's purpose seems more for alerting on the completion of restoration rather than taking some action on the store as restoration begins/ends.

Restoration completion =/= restoration ending, but unfortunately we just have the one callback

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that last sentence is what I was thinking would make sense to just call all the listeners, not just the inner ones. If you implemented the listener so that you could log or collect metrics to watch the restore process, it seems like it would be strange just to see the restoration disappear, and then a new restoration start (for the post-recycled task), without the prior one ever having "ended". It just seems natural to see an end for every start, even if the "end" doesn't mean "completed". But I can see how that could also be confusing.

I'm not totally sure what the best call here is, so maybe we should just defer to your judgement, since you're the closest to this code right now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a new restoration start (for the post-recycled task), without the prior one ever having "ended".

This could happen if you have a restoring task that transitions to standby and then back to restoring. But invoking onRestoreStart twice in a row in that case is the current behavior, so your listener should presumably already be handling the situation. My impression is that users understand the global restore listener's onRestoreEnd to mean that restoration has completed, and invoking it before this would be an unexpected behavior change.

I think in an ideal world we would decouple these two callbacks to make the distinction more apparent.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah actually @guozhangwang filed a ticket recently for exactly that: https://issues.apache.org/jira/browse/KAFKA-10005

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I'm satisfied with this conclusion.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, I propose to decouple these two and only allow restore callback at the per-store level and restore listener at the global level. We will always open the store with compaction disabled etc when we are transiting to restoring, and after we've done the restoration (for active) we will do a one-off compaction, and then reopen the store with configs overridden.

Copy link
Contributor

@vvcephei vvcephei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @ableegoldman , I'm still giving the tests a final pass, but I had a few non-test comments for you.

@Override
public void closeAndRecycleState() {
prepareClose(true);
close(true, true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I introduced closeClean and closeDirty, I resisted the urge to inline close(boolean) only to control the LOC of the change. Having more branches and flags is generally more of a liability than a few duplicated statements.

Now that we have two boolean flags (again) and new branch in the internal close method, I'd be much more inclined to inline it. But we can do this in a follow-on PR, if you prefer.

final List<TopicPartition> revokedInitializedChangelogs = new ArrayList<>();

for (final TopicPartition partition : revokedChangelogs) {
final ChangelogMetadata changelogMetadata = changelogs.remove(partition);
if (changelogMetadata != null) {
if (changelogMetadata.state() != ChangelogState.REGISTERED) {
if (triggerOnRestoreEnd && changelogMetadata.state().equals(ChangelogState.RESTORING)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this makes sense. The restore listener / bulk loading interaction is a bit wacky, but it seems reasonable to just work around it for now.

Just to play devil's advocate briefly, though, is it not true for all listeners that the restore has ended, for exactly the reason you cited above?

Comment on lines 297 to 299
} else {
dirtyTasks.addAll(tasksToRecycle);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like an odd case. Am I right in reading this as, "something went wrong, and we don't know what it was, so we're just going to assume the worst and dump all the tasks that we were hoping to recycle"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it still seems to me like if we have to close any tasks as dirty we will ultimately have to do so for them all (as in handleLostAll) But that's a big assumption and even if true now, it may not always be...I'll remove this

@ableegoldman ableegoldman force-pushed the KIP-441-fix-standby-to-active-transition branch 2 times, most recently from 23057e6 to f40c574 Compare May 28, 2020 00:23
Comment on lines -186 to -187
final Map<TaskId, Set<TopicPartition>> activeTasksToCreate = new TreeMap<>(activeTasks);
final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate = new TreeMap<>(standbyTasks);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@guozhangwang was there a reason for these to be TreeMaps?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think there was a reason besides determinism for debugging, etc.

@guozhangwang
Copy link
Contributor

test this please

1 similar comment
@guozhangwang
Copy link
Contributor

test this please

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

final List<TopicPartition> revokedInitializedChangelogs = new ArrayList<>();

for (final TopicPartition partition : revokedChangelogs) {
final ChangelogMetadata changelogMetadata = changelogs.remove(partition);
if (changelogMetadata != null) {
if (changelogMetadata.state() != ChangelogState.REGISTERED) {
if (triggerOnRestoreEnd && changelogMetadata.state().equals(ChangelogState.RESTORING)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, I propose to decouple these two and only allow restore callback at the per-store level and restore listener at the global level. We will always open the store with compaction disabled etc when we are transiting to restoring, and after we've done the restoration (for active) we will do a one-off compaction, and then reopen the store with configs overridden.

Copy link
Contributor

@vvcephei vvcephei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok! I've completed my review. Thanks, @ableegoldman !

Comment on lines -186 to -187
final Map<TaskId, Set<TopicPartition>> activeTasksToCreate = new TreeMap<>(activeTasks);
final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate = new TreeMap<>(standbyTasks);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think there was a reason besides determinism for debugging, etc.

@ClassRule
public static final EmbeddedKafkaCluster CLUSTER =
new EmbeddedKafkaCluster(NUM_BROKERS);
public final EmbeddedKafkaCluster cluster = new EmbeddedKafkaCluster(NUM_BROKERS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to remove the Rule annotation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made the cluster non-static because there was poor isolation between the tests and, for example, setCommittedOffset would often fail. This was just the "lazy" solution, we could also make sure to give the mall unique application ids

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Maybe making it a Rule instead of a ClassRule is the solution you're looking for?

But yes, there's a middle ground, which is to use uniquely named resources for each test, for which we have org.apache.kafka.streams.integration.utils.IntegrationTestUtils#safeUniqueTestName.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the advantage of a Rule over just...nothing?

Mostly I just didn't want to hard code a bunch of names in myself. But safeUniqueTestName sounds like what I need

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nevermind, I take it the answer is "it calls start for you since the embedded cluster extends ExternalResource ?"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remember now, the main reason for the poor test isolation was different tests writing various amounts of data to the different topics at specific offsets. Honestly I'm not sure why this wasn't causing problems before 🤔
Although...I notice that the first test writes 10000 keys to the input topic starting at offset 0, while the second test to write to the input topic does so starting at offset 10000. Clearly I should have looked at all the other tests in the class, counted them up, and then just started writing data at the highest offset

@@ -1247,4 +1270,33 @@ public void waitForNextStableAssignment(final long maxWaitMs) throws Interrupted
);
}
}

public static class TrackingStateRestoreListener implements StateRestoreListener {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't threadsafe, but it looks like we're using it from multiple threads during the tests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whoops

Comment on lines -1471 to -1473
// make sure we also remove the changelog partitions from the changelog reader
changeLogReader.remove(eq(singletonList(changelog)));
expectLastCall();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like I missed something here. We don't expect the changelog to get unregistered during close anymore?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The "unregistration" is now handled by the ProcessorStateManager, which is mocked in this test. There are new tests in ProcessorStateManagerTest that verify we unregister the partitions during close/recycle

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks.

null,
KEY_SERIALIZER,
VALUE_SERIALIZER
((RecordCollector.Supplier) context).recordCollector().send(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One high-level concern I've developed during this review is whether there's any possibility that something like this could leak into the public API. I.e., is it possible that a Processor, Transformer, or StateStore could have cached some reference from the context that would become invalid when the context gets recycled, similar to the way this recordCollector (which is not public, I know) did.

What's your take on that, @ableegoldman ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, only active tasks have a topology, and we don't initialize the topology until everything has been cleanly recycled. So by the time init is being called and the context is being used, it should be all up-to-date with the active task references.

Of course that only applies to the Processor/Transformer half of the question. With StateStores we're obviously still calling init for standby tasks, and more. But nothing in the public ProcessorContext interface gets recycled. Only the cache, record collector, and StreamTask have to be updated, so this should all be totally transparent (unless they're doing something they shouldn't be)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. Thanks.

@ableegoldman ableegoldman force-pushed the KIP-441-fix-standby-to-active-transition branch from 9f47546 to 8438033 Compare May 28, 2020 22:18
@vvcephei
Copy link
Contributor

Test this please

1 similar comment
@vvcephei
Copy link
Contributor

Test this please

Copy link
Contributor

@vvcephei vvcephei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @ableegoldman !

streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(applicationId).getPath());
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(appId).getPath());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note for the future: it's not necessary to prefix the temp directory.

@guozhangwang guozhangwang merged commit 9d52dec into apache:trunk May 29, 2020
@guozhangwang
Copy link
Contributor

Merged to trunk.

guozhangwang pushed a commit that referenced this pull request May 29, 2020
…#8248)

This PR has gone through several significant transitions of its own, but here's the latest:

* TaskManager just collects the tasks to transition and refers to the active/standby task creator to handle closing & recycling the old task and creating the new one. If we ever hit an exception during the close, we bail and close all the remaining tasks as dirty.

* The task creators tell the task to "close but recycle state". If this is successful, it tells the recycled processor context and state manager that they should transition to the new type.

* During "close and recycle" the task just does a normal clean close, but instead of closing the state manager it informs it to recycle itself: maintain all of its store information (most importantly the current store offsets) but unregister the changelogs from the changelog reader

* The new task will (re-)register its changelogs during initialization, but skip re-registering any stores. It will still read the checkpoint file, but only use the written offsets if the store offsets are not already initialized from pre-transition

* To ensure we don't end up with manual compaction disabled for standbys, we have to call the state restore listener's onRestoreEnd for any active restoring stores that are switching to standbys

Reviewers: John Roesler <vvcephei@apache.org>, Guozhang Wang <wangguoz@gmail.com>
@guozhangwang
Copy link
Contributor

Cherry-picked to 2.6 as well. cc @mjsax for rebasing

Kvicii pushed a commit to Kvicii/kafka that referenced this pull request May 30, 2020
* 'trunk' of github.com:apache/kafka: (36 commits)
  Remove redundant `containsKey` call in KafkaProducer (apache#8761)
  KAFKA-9494; Include additional metadata information in DescribeConfig response (KIP-569) (apache#8723)
  KAFKA-10061; Fix flaky `ReassignPartitionsIntegrationTest.testCancellation` (apache#8749)
  KAFKA-9130; KIP-518 Allow listing consumer groups per state (apache#8238)
  KAFKA-9501: convert between active and standby without closing stores (apache#8248)
  KAFKA-10056; Ensure consumer metadata contains new topics on subscription change (apache#8739)
  MINOR: Log the reason for coordinator discovery failure (apache#8747)
  KAFKA-10029; Don't update completedReceives when channels are closed to avoid ConcurrentModificationException (apache#8705)
  MINOR: remove unnecessary timeout for admin request (apache#8738)
  MINOR: Relax Percentiles test (apache#8748)
  MINOR: regression test for task assignor config (apache#8743)
  MINOR: Update documentation.html to refer to 2.6 (apache#8745)
  MINOR: Update documentation.html to refer to 2.5 (apache#8744)
  KAFKA-9673: Filter and Conditional SMTs (apache#8699)
  KAFKA-9971: Error Reporting in Sink Connectors (KIP-610) (apache#8720)
  KAFKA-10052: Harden assertion of topic settings in Connect integration tests (apache#8735)
  MINOR: Slight MetadataCache tweaks to avoid unnecessary work (apache#8728)
  KAFKA-9802; Increase transaction timeout in system tests to reduce flakiness (apache#8736)
  KAFKA-10050: kafka_log4j_appender.py fixed for JDK11 (apache#8731)
  KAFKA-9146: Add option to force delete active members in StreamsResetter (apache#8589)
  ...

# Conflicts:
#	core/src/main/scala/kafka/log/Log.scala
ijuma added a commit to confluentinc/kafka that referenced this pull request Jun 3, 2020
* apache-github/2.6: (32 commits)
  KAFKA-10083: fix failed testReassignmentWithRandomSubscriptionsAndChanges tests (apache#8786)
  KAFKA-9945: TopicCommand should support --if-exists and --if-not-exists when --bootstrap-server is used (apache#8737)
  KAFKA-9320: Enable TLSv1.3 by default (KIP-573) (apache#8695)
  KAFKA-10082: Fix the failed testMultiConsumerStickyAssignment (apache#8777)
  MINOR: Remove unused variable to fix spotBugs failure (apache#8779)
  MINOR: ChangelogReader should poll for duration 0 for standby restore (apache#8773)
  KAFKA-10030: Allow fetching a key from a single partition (apache#8706)
  Kafka-10064 Add documentation for KIP-571 (apache#8760)
  MINOR: Code cleanup and assertion message fixes in Connect integration tests (apache#8750)
  KAFKA-9987: optimize sticky assignment algorithm for same-subscription case (apache#8668)
  KAFKA-9392; Clarify deleteAcls javadoc and add test for create/delete timing (apache#7956)
  KAFKA-10074: Improve performance of `matchingAcls` (apache#8769)
  KAFKA-9494; Include additional metadata information in DescribeConfig response (KIP-569) (apache#8723)
  KAFKA-10056; Ensure consumer metadata contains new topics on subscription change (apache#8739)
  KAFKA-10029; Don't update completedReceives when channels are closed to avoid ConcurrentModificationException (apache#8705)
  KAFKA-10061; Fix flaky `ReassignPartitionsIntegrationTest.testCancellation` (apache#8749)
  KAFKA-9130; KIP-518 Allow listing consumer groups per state (apache#8238)
  KAFKA-9501: convert between active and standby without closing stores (apache#8248)
  MINOR: Relax Percentiles test (apache#8748)
  MINOR: regression test for task assignor config (apache#8743)
  ...
@ableegoldman ableegoldman deleted the KIP-441-fix-standby-to-active-transition branch June 26, 2020 22:37
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants