New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-8360] Implement task-local state recovery #5239

Closed
wants to merge 36 commits into
base: master
from

Conversation

Projects
None yet
6 participants
@StefanRRichter
Contributor

StefanRRichter commented Jan 4, 2018

What is the purpose of the change

This changes introduces the task-local recovery feature. The main idea is to have a secondary, local copy of the checkpointed state, while there is still a primary copy in DFS that we report to the checkpoint coordinator.

Recovery can attempt to restore from the secondary local copy, if available, to save network bandwidth. This requires that the assignment from tasks to slots is as sticky is possible.

For starters, we will implement this feature for all managed keyed states and can easily enhance it to all other state types (e.g. operator state) later, because the basic infrastructure is already in place. This PR is on top of #4745.

Brief change log

  • Introduced TaskExecutorLocalStateStoresManager per task manager. This class manages one TaskLocalStateStore for each task running on the task manager.
  • TaskLocalStateStore stores and provides the local state for one task. Reporting of checkpointed states goes through this class. The primary state handles are forwarded to the checkpoint coordinator, the optional secondary (local) state is stored in the local store.
  • LocalRecoveryDirectoryProvider is used by TaskLocalStateStore to manage the local state directory/ies.
  • StreamTaskStateManager uses the TaskLocalStateStore to restore state for its operators.
  • File-based local state is created through DuplicatingCheckpointOutputStream, a stream that duplicates writes into two internal streams - typically one primary against a DFS and one secondary against local FS.
  • RocksDB's incremental checkpoints are not just based on one file, but on a directory. As we do not require reference counting for local files (we can use hardlinks), we can deal with checkpoint directories as a whole. We introduced IncrementalLocalKeyedStateHandle extends DirectoryKeyedStateHandle for this purpose.

Verifying this change

This change added tests and can be verified as follows:

  • If we activate the local recovery feature on the state backend (via #setLocalRecoveryMode(...), and introduce task failure by user-code exception, we should observe that managed keyed state is recovered from the local FS through the logs.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes)
  • The serializers: (yes, slightly and in a way that should not matter.)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (yes)
  • If yes, how is the feature documented? (Documentation pending)
@StefanRRichter

This comment has been minimized.

Contributor

StefanRRichter commented Jan 4, 2018

@pnowojski

Today I only managed to review up to StateInitializationContextImpl.java from second commit. Will continue on Monday.

@@ -145,8 +145,8 @@
private volatile Throwable failureCause; // once assigned, never changes
/** The handle to the state that the task gets on restore */
private volatile TaskStateSnapshot taskState;
/** Information to restore the task onr recovery, such as checkpoint id and task state snapshot */

This comment has been minimized.

@pnowojski

pnowojski Jan 5, 2018

Contributor

onr typo

This comment has been minimized.

@StefanRRichter
/** The handle to the state that the task gets on restore */
private volatile TaskStateSnapshot taskState;
/** Information to restore the task onr recovery, such as checkpoint id and task state snapshot */
private volatile TaskRestore taskRestore;

This comment has been minimized.

@pnowojski

pnowojski Jan 5, 2018

Contributor

I would add SerializableOptional class, make this field SerializableOptional<TaskRestore> and pass it down to Task to avoid nulls.

If you do not like this, please at least add @Nullable annotations to this field and subsequent usages.

Or maybe embed isEmpty logic into TaskRestore.

@@ -831,11 +834,12 @@ private StreamStateHandle materializeStateData(Path filePath) throws Exception {
return result;
} finally {
if (inputStream != null && closeableRegistry.unregisterCloseable(inputStream)) {
if (closeableRegistry.unregisterCloseable(inputStream)) {

This comment has been minimized.

@pnowojski

pnowojski Jan 5, 2018

Contributor

Is it unrelated refactor/clean up? If so please move to separate commit. (ditto for other xyz != null checks removal)

This comment has been minimized.

@StefanRRichter
inputStream.close();
}
if (outputStream != null && closeableRegistry.unregisterCloseable(outputStream)) {
if (closeableRegistry.unregisterCloseable(outputStream)) {

This comment has been minimized.

@pnowojski

pnowojski Jan 5, 2018

Contributor

Side note, I do not understand why there is a try finally block with close() AND additional closeableRegistry. Is closeableRegistry used for artificially interrupting reads/writes?

Anyway, could you add some comment explaining this to the closeableRegistry field declaration?

This comment has been minimized.

@StefanRRichter

StefanRRichter Jan 5, 2018

Contributor

Yes, that is what it is used for. In case of cancelation, we want all blocking ops to terminate asap using this mechanism.

This comment has been minimized.

@pnowojski

pnowojski Jan 8, 2018

Contributor

Maybe instead of commenting this, we could rename closeableRegistry to ongoingIoInteruptRegistry or ongoingIoCancellingRegistry

@@ -1041,7 +1045,13 @@ public void restore(Collection<KeyedStateHandle> restoreState) throws Exception
@Override
public void notifyCheckpointComplete(long completedCheckpointId) {
if (!enableIncrementalCheckpointing) {

This comment has been minimized.

@pnowojski

pnowojski Jan 5, 2018

Contributor

?

  1. Is this change relevant to the commit? If not please extract it to separate one.
  2. Why we do not need to handle notify for disabled incremental checkpointing? Doesn't it deserve some explanation in form of a comment?

This comment has been minimized.

@StefanRRichter

This comment has been minimized.

@tillrohrmann

tillrohrmann Jan 31, 2018

Contributor

I think it would actually be easier to understand when written:

if (incrementalCheckpointingEnabled) {
  // do clean up work for local sst files which is only relevant for incremental checkpointing
}
* Operators, in turn, can use the context to initialize everything connected to their state, such as backends or
* a timer service manager.
*/
public interface StreamTaskStateManager {

This comment has been minimized.

@pnowojski

pnowojski Jan 5, 2018

Contributor

As we discussed, you could rename this class to StreamTaskStateInitializer and limit it's life scope.

This comment has been minimized.

@StefanRRichter
/**
* This interface must be implemented by functions/operations that want to receive
* a commit notification once a checkpoint has been completely acknowledged by all
* participants.
*/
@PublicEvolving

This comment has been minimized.

@pnowojski

pnowojski Jan 5, 2018

Contributor

Separate commit?

This comment has been minimized.

@StefanRRichter

StefanRRichter Jan 19, 2018

Contributor

Ideally yes, but I kept it because there is not a clear commit where it should belong to and I find it too trivial for a separate commit.

*/
void close() throws Exception;
void close() throws IOException;

This comment has been minimized.

@pnowojski

pnowojski Jan 5, 2018

Contributor

What's this change for? Is it used anywhere? It seems to me like close() is just a dead code and this method could be dropped, or did I missed something?

This comment has been minimized.

@StephanEwen

StephanEwen Jan 9, 2018

Contributor

I think this method is actually removed in the latest master.

@@ -147,7 +147,7 @@ public void close() throws IOException {
@Override
public void dispose() {
IOUtils.closeQuietly(this);
IOUtils.closeQuietly(closeStreamOnCancelRegistry);

This comment has been minimized.

@pnowojski

pnowojski Jan 5, 2018

Contributor

Another irrelevant change?

@@ -1,104 +0,0 @@
/*

This comment has been minimized.

@pnowojski

pnowojski Jan 5, 2018

Contributor

That was a dead code, right? Please extract removal of this (and MultiStreamStateHandleTest) to separate commit.

This comment has been minimized.

@StefanRRichter

StefanRRichter Jan 5, 2018

Contributor

Yes, they are deleted in later commits anyways.

@StephanEwen

This comment has been minimized.

Contributor

StephanEwen commented Jan 9, 2018

I did a peer review and walk through the code.
Overall, the design look good, +1 for that!

Some comments:

  • I would argue to change the way that these recovery options are configured. Currently, this goes through methods on one state backend objects, i.e., configuration in code. Because that recovery aspect is a really an "ops-related" aspect of running a Flink job (or a broader streaming platform), it should not be configured in code, but in the config. I found it helpful to thing that settings in code are for what concerns the application developers, settings in the config for what concerns the people that run Flink. They may be the same person in the end, but even then it is helpful because they are frequently are in different stages of the application development and deployment. Configurations are more easy to "standardize on", like "we want all applications in that group to enable local recovery".

  • One thing I am not yet 100% sure of is how this will interact in the future with RocksDB's optimized local recovery. I assume that checkpoints will in the future always use incremental snapshots. For such, there is no stream of bytes to store locally in addition. The files are already local and immutable. Here, the RocksDB snapshot should probably directly go through the local recovery directory, and the diff files would be persisted from there (the complete snapshot, which consists only of hardlinks to the files that are also in the work directory, would be retained though). Is the assumption that this is a "retain data structure" style mechanism, bespoke for each state backend, similar as retaining the heap copy-on-write table for the Heap State Backend?

Now, since this PR is already complicated and needs a heavy rebase, I would be okay with doing that in another PR, if there is commitment to do this soon (before the 1.5 release branch is cut).

Slightly off topic: This code has a very distinct style of using many @Nonnull annotations. Other newer parts of the code (the once that use annotations) follow the contract "non-null unless annotated with @Nullable". I don't ask to change this. Would be good to actually have a discussion and come up with a recommended style to agree on for the future.

@StefanRRichter

This comment has been minimized.

Contributor

StefanRRichter commented Jan 9, 2018

Thanks for going through the general design @StephanEwen ! As we discussed, I agree with your first point. For the second point about RocksDB, this PR already contains an optimized way to deal with incremental local checkpoints that we did not discuss in our review, because I thought it is too much of a low level detail.
It does not work with duplicating streams. Instead, I introduced a state handle type for a local directory. In fact, I mapped the previous incremental recovery from DFS state also to this new handle type: dfs state is first downloaded and then it also simply becomes a local directory state handle. From there, both incremental recovery paths are identical. The approach also uses complete local checkpoints with hardlinks, as created by RocksDB's native snapshot mechanism.

Full snapshots, work with duplicated streams.

@pnowojski

I have finished looking through the second commit. Thanks for the patient and sorry for late response but it took me quite some time to understand what's this change is about and I was already kind of overloaded with other reviews :(

import org.apache.flink.util.CloseableIterable;
/**
* This interface represents a context from which a stream operator can initialize everything connected state to

This comment has been minimized.

@pnowojski

pnowojski Jan 11, 2018

Contributor

Possibly a typo: s/"can initialize everything connected state to state"/"can initialize everything connected to state"/g

This comment has been minimized.

@StefanRRichter
return operatorStateBackend;
}
private <K> AbstractKeyedStateBackend<K> createKeyedStatedBackendFromJobManagerState(

This comment has been minimized.

@pnowojski

pnowojski Jan 11, 2018

Contributor

nit: could you mark methods as static if they are not using this variable?

backendCloseableRegistry);
}
protected <K> AbstractKeyedStateBackend<K> keyedStatedBackend(

This comment has been minimized.

@pnowojski

pnowojski Jan 11, 2018

Contributor

this is more of a general remark, can you name the methods as getSomething() or createSomething() to communicate what they are actually doing? I had to couple of times go to the implementation because it wasn't (at least for me :( ) clear from the context which one of it is it.

timeServiceManager = new InternalTimeServiceManager<>(
getKeyedStateBackend().getNumberOfKeyGroups(),
getKeyedStateBackend().getKeyGroupRange(),
final StreamOperatorStateContext context =

This comment has been minimized.

@pnowojski

pnowojski Jan 11, 2018

Contributor

Do we need this one additional level of indirection in form of StreamOperatorStateContext? Couldn't streamTaskStateManager return the StateInitializationContext directly? Or do you think otherwise? (I'm not sure on this comment)

This comment has been minimized.

@StefanRRichter

StefanRRichter Jan 19, 2018

Contributor

As we discussed, this is bridging between internal and user-facing interfaces. I would prefer to keep it as is, at least for now.

* @param stateCollectionFuture to be discarded
* @throws Exception if the discard operation failed
*/
public static void discardStateCollectionFuture(

This comment has been minimized.

@pnowojski

pnowojski Jan 11, 2018

Contributor

It seems like a dead code both in this commit and it the whole PR. Isn't it?

This comment has been minimized.

@StefanRRichter
KeyContext keyContext,
Iterable<KeyGroupStatePartitionStreamProvider> rawKeyedStates) throws Exception {
return null;

This comment has been minimized.

@pnowojski

pnowojski Jan 11, 2018

Contributor

Does it make sense to return null in this method? Isn't this some kind of hack or a walkaround some underlying issue?

This comment has been minimized.

@StefanRRichter

StefanRRichter Jan 19, 2018

Contributor

Yes, this is a workaround, because this is a dedicated test for raw keyed states - which is otherwise consumed immediately to restore a timer service. This will fail because we wrote only test data to the stream, not the data for a timer service.

typeSerializer,
closeableRegistry);
verify(stateBackend).createKeyedStateBackend(

This comment has been minimized.

@pnowojski

pnowojski Jan 11, 2018

Contributor

Does it make sense to verify that those methods were called? Isn't it enough to check Assert.assertNotNull(keyedStateBackend); later on? I mean if for example someone will refactor and overload stateBackened.createKeyedStateBackend(...) method this test will start to fail incorrectly. On the other hand, I do not see what those verify() calls give us.

Same applies to the second unit test in this file.

This comment has been minimized.

@StefanRRichter
CloseableIterable<StatePartitionStreamProvider> operatorStateInputs = stateContext.rawOperatorStateInputs();
Assert.assertEquals(false, stateContext.isRestored());
Assert.assertNotNull(operatorStateBackend);

This comment has been minimized.

@pnowojski

pnowojski Jan 11, 2018

Contributor

Shouldn't you make some additional assertions on those fields besides checking not null? Or is it enough on this level to have just those not nulls?

This comment has been minimized.

@StefanRRichter

StefanRRichter Jan 19, 2018

Contributor

This check is sufficient for this test.

env.getJobID(),
env.getExecutionId());
when(this.environment.getTaskStateManager()).thenReturn(this.taskStateManager);

This comment has been minimized.

@pnowojski

pnowojski Jan 11, 2018

Contributor

Imagine someone in his test creating harness in the following way:

stateManager = new MyFancyTaskStateManager();
new AbstractStreamOperatorTestHarness(
    operator, 
    new MockEnvironment(
        (...),
        stateManager);

and suddenly this line is replacing his MyFancyTaskStateManager in some of the usages (but maybe not all of them). Where MyFancyTestStateManager means anything that is not the same instance with the same behaviour as the one which you return here (which kind of defeats the purpose, since either this field is already exactly what you want it to be, or something that shouldn't even exist).

Rephrasing this problem, what this line is doing right here, is using a reflection calls to override MockEnvironment's private final field (private final TaskStateManager taskStateManager;), overriding it's original value.

Maybe this problem could also go away if you split the TaskStateManager as I mentioned earlier.

If not, please refactor this code/users of this constructor, so that they pass correctly setup'ed TaskStateManager. Also please note, that 99% of this constructor are using MockEnvironment, which was passed to this method with the intention of test harness taking over the ownership of the MockEnvironment. In other words, I would bet that all of those problems would go away, if MockEnvironment was created locally, instead of being injected. The only exception to this 99% are three calls in AsyncWaitOperatorTest, which actually should be quite easy refactored to follow up the same code path and to use MockEnvironment.

This comment has been minimized.

@StefanRRichter

StefanRRichter Jan 19, 2018

Contributor

👍 I made all tests use the MockEnvironment.

@@ -85,8 +84,24 @@ public String map(Tuple2<String, Integer> value) throws Exception {
fail();
}
catch (JobExecutionException e) {
boolean success = false;

This comment has been minimized.

@pnowojski

pnowojski Jan 11, 2018

Contributor

assertTrue(ExceptionUtils.findThrowable(e, SuccessException.class).isPresent());

This comment has been minimized.

@StefanRRichter
@tillrohrmann

Minor comment concerning JavaDocs.

@@ -1016,7 +1017,7 @@ void releaseResources(boolean canceled) {
}
@Override
public void restore(Collection<KeyedStateHandle> restoreState) throws Exception {
public void restore(StateObjectCollection<KeyedStateHandle> restoreState) throws Exception {

This comment has been minimized.

@tillrohrmann

tillrohrmann Jan 31, 2018

Contributor

Why is this necessary?

This comment has been minimized.

@StefanRRichter

StefanRRichter Feb 14, 2018

Contributor

Will change it back to Collection.

@@ -325,8 +325,7 @@ public OperatorStateHandle performOperation() throws Exception {
return task;
}
@Override
public void restore(Collection<OperatorStateHandle> restoreSnapshots) throws Exception {
public void restore(StateObjectCollection<OperatorStateHandle> restoreSnapshots) throws Exception {

This comment has been minimized.

@tillrohrmann

tillrohrmann Jan 31, 2018

Contributor

Why can't it remain a Collection?

This comment has been minimized.

@StefanRRichter

StefanRRichter Jan 31, 2018

Contributor

Because there where a couple of static helper methods operating on those collections, e.g. disposing a collection of state handles or checking if the collection as a whole has any state. IMO those things should go to a separate class and not be static helpers over a generic collection.

This comment has been minimized.

@StefanRRichter

StefanRRichter Jan 31, 2018

Contributor

Ok, if you just talk about this particular usage, I might double-check if the interface is enough.

This comment has been minimized.

@tillrohrmann

tillrohrmann Feb 1, 2018

Contributor

I couldn't find any call sites of these methods in the implementations of Snapshotable#restore. But I only skimmed over it.

This comment has been minimized.

@StefanRRichter

StefanRRichter Feb 14, 2018

Contributor

True, it can remain Collection.

@@ -54,5 +54,5 @@
*
* @param state the old state to restore.
*/
void restore(Collection<S> state) throws Exception;
void restore(R state) throws Exception;

This comment has been minimized.

@tillrohrmann

tillrohrmann Jan 31, 2018

Contributor

To me it is not really clear, why you've made this change here? Code wise it does not seem necessary.

* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

This comment has been minimized.

@tillrohrmann

tillrohrmann Jan 31, 2018

Contributor

License and package seem in the wrong order.

This comment has been minimized.

@StefanRRichter
private static final long serialVersionUID = 1L;
private final T jobManagerOwnedSnapshot;
private final T taskLocalSnapshot;

This comment has been minimized.

@tillrohrmann

tillrohrmann Jan 31, 2018

Contributor

Maybe annotate as Nullable.

/**
*
*/
public class SnapshotResult<T extends StateObject> implements StateObject {

This comment has been minimized.

@tillrohrmann

tillrohrmann Jan 31, 2018

Contributor

I'm wondering whether it wouldn't make sense to have a SnapshotResult and a subclass SnapshotResultWithLocalSnapshot. The latter has an additional field for the taskLocalSnapshot. That way it's much clearer how to initialize these structures. Ideally the arguments are both not null.

This comment has been minimized.

@tillrohrmann

tillrohrmann Jan 31, 2018

Contributor

That way we would also get rid of all the null checks.

@@ -130,7 +136,7 @@ public OperatorStateCheckpointOutputStream getRawOperatorStateOutput() throws Ex
}
private <T extends StreamStateHandle> void closeAndUnregisterStream(
NonClosingCheckpointOutputStream<T> stream) throws IOException {
NonClosingCheckpointOutputStream<? extends T> stream) throws IOException {

This comment has been minimized.

@tillrohrmann

tillrohrmann Jan 31, 2018

Contributor

Should be sufficient to have NonClosingCheckpointOutputStream<? extends StreamStateHandle>

This comment has been minimized.

@StefanRRichter
private <T extends StateObject> RunnableFuture<SnapshotResult<T>> getGenericStateStreamFuture(
NonClosingCheckpointOutputStream<? extends T> stream) throws IOException {
T operatorStateHandle = (T) closeAndUnregisterStreamToObtainStateHandle(stream);

This comment has been minimized.

@tillrohrmann

tillrohrmann Feb 1, 2018

Contributor

This cast seems a bit fishy to me. I think it should not be necessary if the generics are applied correctly. A way to solve it would be T extends StreamStateHandle and RunnableFuture<? extends SnapshotResult<? extends KeyedStateHandle>> getKeyedStateStreamFuture()

This comment has been minimized.

@StefanRRichter

StefanRRichter Feb 14, 2018

Contributor

I solved it slightly different. The change you proposed would cascade into other methods and fields.

@@ -47,10 +47,11 @@
* @param checkpointMetrics task level metrics for the checkpoint.
* @param acknowledgedState the reported states from the owning task.

This comment has been minimized.

@tillrohrmann

tillrohrmann Feb 1, 2018

Contributor

JavaDocs not adapted.

This comment has been minimized.

@StefanRRichter
operatorStateBackend.snapshot(0L, 0L, streamFactory, CheckpointOptions.forCheckpoint());
OperatorStateHandle stateHandle = FutureUtil.runIfNotDoneAndGet(snapshot);
SnapshotResult<OperatorStateHandle> snapshotResult = FutureUtil.runIfNotDoneAndGet(snapshot);
OperatorStateHandle stateHandle = snapshotResult != null ? snapshotResult.getJobManagerOwnedSnapshot() : null;

This comment has been minimized.

@tillrohrmann

tillrohrmann Feb 1, 2018

Contributor

This seems a bit circumstantial. Can't we make it that snapshotResult is always not null?

This comment has been minimized.

@StefanRRichter
if(!snapshotRunnableFuture.isDone()) {
Thread runner = new Thread(snapshotRunnableFuture);
runner.start();
}
return snapshotRunnableFuture.get();
SnapshotResult<KeyedStateHandle> snapshotResult = snapshotRunnableFuture.get();

This comment has been minimized.

@tillrohrmann

tillrohrmann Feb 1, 2018

Contributor

Why are we spawning a new Thread if we block the current thread here? Wouldn't it be better to simply call run on the RunnableFuture if it is not yet done?

This comment has been minimized.

@StefanRRichter
return snapshotRunnableFuture.get();
SnapshotResult<KeyedStateHandle> snapshotResult = snapshotRunnableFuture.get();
return snapshotResult != null ? snapshotResult.getJobManagerOwnedSnapshot() : null;

This comment has been minimized.

@tillrohrmann

tillrohrmann Feb 1, 2018

Contributor

By enforcing that snapshotResult is always not null, we could get rid of all these ternary statements.

This comment has been minimized.

@StefanRRichter
}
private <T extends StateObject> T extractJobManagerOwned(SnapshotResult<T> snapshotResult) {
return snapshotResult != null ? snapshotResult.getJobManagerOwnedSnapshot() : null;

This comment has been minimized.

@tillrohrmann

tillrohrmann Feb 1, 2018

Contributor

Same here with the null values.

This comment has been minimized.

@tillrohrmann

tillrohrmann Feb 1, 2018

Contributor

We should add @Nullable annotation to make sure that this method can return a null value.

This comment has been minimized.

@StefanRRichter

StefanRRichter Feb 14, 2018

Contributor

This method is no longer required, with the changes from other comments that make sure SnapshotResult is never null.

}
private void handleExecutionException(Exception e) {
// the state is completed if an exception occurred in the acknowledgeCheckpoint call

This comment has been minimized.

@tillrohrmann

tillrohrmann Feb 1, 2018

Contributor

comment needs to be adapted.

This comment has been minimized.

@StefanRRichter
@@ -46,20 +47,28 @@ public void testCancelAndCleanup() throws Exception {
operatorSnapshotResult.cancel();
KeyedStateHandle keyedManagedStateHandle = mock(KeyedStateHandle.class);
RunnableFuture<KeyedStateHandle> keyedStateManagedFuture = mock(RunnableFuture.class);
when(keyedStateManagedFuture.get()).thenReturn(keyedManagedStateHandle);
RunnableFuture<SnapshotResult<KeyedStateHandle>> keyedStateManagedFuture = mock(RunnableFuture.class);

This comment has been minimized.

@tillrohrmann

tillrohrmann Feb 1, 2018

Contributor

Why not using a DoneFuture instead of mocking?

This comment has been minimized.

@StefanRRichter
}
if (asyncCheckpointState.compareAndSet(
CheckpointingOperation.AsynCheckpointState.RUNNING,
CheckpointingOperation.AsynCheckpointState.DISCARDED)) {

This comment has been minimized.

@tillrohrmann

tillrohrmann Feb 1, 2018

Contributor

This could be asyncCheckpointState.compareAndSet(currentAsyncCheckpointState, DISCARDED). Then we could remove the compare and set above this if condition.

This comment has been minimized.

@StefanRRichter
@@ -227,21 +229,13 @@ protected void cleanup() throws Exception {
// has been stopped
CLEANUP_LATCH.trigger();
// wait until handle async exception has been called to proceed with the termination of the
// StreamTask
HANDLE_ASYNC_EXCEPTION_LATCH.await();

This comment has been minimized.

@tillrohrmann

tillrohrmann Feb 1, 2018

Contributor

This handle should be deleted since it is no longer needed.

This comment has been minimized.

@StefanRRichter

StefanRRichter Feb 14, 2018

Contributor

Already done in a later commit.

// StreamTask
HANDLE_ASYNC_EXCEPTION_LATCH.await();
// wait until all async checkpoint threads are terminated, so that no more exceptions can be reported
Assert.assertTrue(getAsyncOperationsThreadPool().awaitTermination(30L, TimeUnit.SECONDS));

This comment has been minimized.

@tillrohrmann

tillrohrmann Feb 1, 2018

Contributor

Where do we trigger the shut down of the getAsyncOperationsThreadpool?

This comment has been minimized.

@StefanRRichter

StefanRRichter Feb 14, 2018

Contributor

I happens through the StreamTask, before cleanups.

this.reportedCheckpointId = -1L;
this.subtaskLocalStateBaseDirectory =
new File(System.getProperty("java.io.tmpdir"), "testLocalState_" + UUID.randomUUID());

This comment has been minimized.

@tillrohrmann

tillrohrmann Feb 1, 2018

Contributor

Shouldn't this come from something like a TemporaryFolder such that the directory get's cleaned up in all cases. Consequently, maybe we should pass in the subtaskLocalStateBaseDirectory from the test using this class.

This comment has been minimized.

@StefanRRichter

StefanRRichter Feb 14, 2018

Contributor

This is already changed in later commits.

import static org.mockito.Mockito.spy;
/**
* Helper class with a method that attempt to automatically test method forwarding between a delegate and a wrapper.

This comment has been minimized.

@tillrohrmann

tillrohrmann Feb 1, 2018

Contributor

"attempts" or "with methods"

This comment has been minimized.

@StefanRRichter
reset(delegate);
} catch (Exception ex) {
ex.printStackTrace();
fail("Forwarding test failed: " + ex.getMessage());

This comment has been minimized.

@tillrohrmann

tillrohrmann Feb 1, 2018

Contributor

I think we should let the exception propagate.

This comment has been minimized.

@StefanRRichter
/**
* Tests for {@link StateObjectCollection}.
*/
public class StateObjectCollectionTest {

This comment has been minimized.

@tillrohrmann

tillrohrmann Feb 1, 2018

Contributor

should extend TestLogger.

This comment has been minimized.

@StefanRRichter
/**
* Test for forwarding of state reporting to and from {@link org.apache.flink.runtime.state.TaskStateManager}.
*/
public class LocalStateForwardingTest {

This comment has been minimized.

@tillrohrmann

tillrohrmann Feb 1, 2018

Contributor

Same here with TestLogger.

This comment has been minimized.

@StefanRRichter
if (parameterType.isArray()) {
arguments[j] = Array.newInstance(parameterType.getComponentType(), 0);
} else if (parameterType.isPrimitive()) {
arguments[j] = 0;

This comment has been minimized.

@tillrohrmann

tillrohrmann Feb 1, 2018

Contributor

Why is a primitive always of type int?

This comment has been minimized.

@StefanRRichter
// Key-groups should match.
BiFunction<KeyedStateHandle, KeyedStateHandle, Boolean> keyedStateApprover =
(ref, alt) -> ref.getKeyGroupRange().equals(alt.getKeyGroupRange());

This comment has been minimized.

@sihuazhou

sihuazhou Feb 16, 2018

Contributor

Why ref.getKeyGroupRange() and alt. ref.getKeyGroupRange () need to be strictly equal? It seems that alt. ref.getKeyGroupRange ().begin <= ref. ref.getKeyGroupRange ().begin && alt. ref.getKeyGroupRange ().end >= ref ref.getKeyGroupRange().end is also acceptable.

This comment has been minimized.

@StefanRRichter

StefanRRichter Feb 16, 2018

Contributor

In theory, we could be less strict, yes. But in practice it doesn't matter because rescaling is not supported right now. See my other comment.

|| alternativesByPriority.isEmpty()
|| !jobManagerState.hasState()
|| jobManagerState.size() != 1) {

This comment has been minimized.

@sihuazhou

sihuazhou Feb 16, 2018

Contributor

This means that current local recovery doesn't support rescale?

This comment has been minimized.

@StefanRRichter

StefanRRichter Feb 16, 2018

Contributor

Exactly, local recovery is for checkpoints not savepoints. Rescaling from checkpoint/savepoint involves a restart of the job, which means that all local state is cleaned up anyways because the job was canceled.

This comment has been minimized.

@sihuazhou

sihuazhou Feb 16, 2018

Contributor

Aha, got it. I wonder if someday flink will support online rescaling.. but it a another topic, ignore me...

This comment has been minimized.

@StefanRRichter

StefanRRichter Feb 16, 2018

Contributor

That might be, but there is still not a fundamental problem with the approach if we change the matching/creation of the effective state handles in the future, once it is required. For the time being, keeping it simple keeps maintenance simpler.

This comment has been minimized.

@sihuazhou

sihuazhou Feb 16, 2018

Contributor

Agreed.

restoreKeyGroupsInStateHandle();
if (isDifferentKeyGroup(oldKey, currentSubIterator.getCurrentKey())) {
heap.offer(currentSubIterator);
currentSubIterator

This comment has been minimized.

@sihuazhou

sihuazhou Feb 17, 2018

Contributor

I know this part is an old code block and wasn't be change in this PR. But it is a bit odds to me, kvStateInformation is not a copied object and it can be changed when writeKVStateMetaData()is invoking ... If I am not wrong, this is a serious bug. Am I misunderstand something?

This comment has been minimized.

@sihuazhou

sihuazhou Feb 17, 2018

Contributor

Strange sorry, my comment does not match the highlighted lines when line number > 1400, the problem I described above is on line: 1878

private void writeKVStateMetaData() throws IOException {
  // ...
        for (Map.Entry<String, Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>> column :
				stateBackend.kvStateInformation.entrySet()) {
        }
  //...
}

This comment has been minimized.

@StefanRRichter

StefanRRichter Feb 19, 2018

Contributor

Yes, I remember that I fixed exactly this also in the heap backend. Defensive copy of the map in the sync part is the fix. I will just make this as a hotfix.

This comment has been minimized.

@sihuazhou

sihuazhou Feb 19, 2018

Contributor

Aha, I have made a PR for this ... Could you please have a look? 5525 I fix this in the way as you have pointed out.

This comment has been minimized.

@StefanRRichter

StefanRRichter Feb 19, 2018

Contributor

Thanks, but like I said, I have this already done in a hotfix that I will commit.

This comment has been minimized.

@sihuazhou

sihuazhou Feb 19, 2018

Contributor

Aha, closing it.

try (RocksDB restoreDb = stateBackend.openDB(
restoreInstancePath.getPath(),
columnFamilyDescriptors,
columnFamilyHandles)) {

This comment has been minimized.

@sihuazhou

sihuazhou Feb 21, 2018

Contributor

This can be optimized. If a restoreInstance satisfy startKeyGroup >= backend.keygroup.startKeyGroup() && endKeyGroup <= backend.keygroup.endKeyGroup() we can open it directly as the target rocksdb, and use it for the rest of the recovery. (for now, we have to iterate all restored rocksdbs.)

This comment has been minimized.

@StefanRRichter

StefanRRichter Feb 21, 2018

Contributor

That might be true, but again this comment has nothing to do with this PR - only indentation was changed. I would strongly ask that this should not become a collection for all possible suggestions to other changes. Opening a jira for this idea would be much better, because this thread is already very long and posting unrelated suggestions like this here make things more confusing and increases the likelihood that something will be overlooked. Would that be ok?

This comment has been minimized.

@sihuazhou

sihuazhou Feb 21, 2018

Contributor

Got it, ok!

@tillrohrmann

Reviewed e940f46. I had some comments concerning serializability of the newly introduced state handles.

@@ -1552,10 +1556,33 @@ public IncrementalSnapshotStrategy() {
return DoneFuture.nullValue();
}
SnapshotDirectory snapshotDirectory;

This comment has been minimized.

@tillrohrmann

tillrohrmann Feb 21, 2018

Contributor

Could be made final

This comment has been minimized.

@StefanRRichter
return new SnapshotResult<>(incrementalKeyedStateHandle, null);
return new SnapshotResult<>(incrementalKeyedStateHandle, directoryKeyedStateHandle);

This comment has been minimized.

@tillrohrmann

tillrohrmann Feb 21, 2018

Contributor

I guess you changed the creation of the SnapshotResult in a fixup commit in your local branch, right? Otherwise we might refactor this in order to get rid of the many nulls in the lines above.

This comment has been minimized.

@StefanRRichter

StefanRRichter Feb 22, 2018

Contributor

Exactly 👍

* present stream state handles.
*/
static SnapshotResult<KeyedStateHandle> toKeyedStateHandleSnapshotResult(
@Nullable SnapshotResult<StreamStateHandle> snapshotResult,

This comment has been minimized.

@tillrohrmann

tillrohrmann Feb 21, 2018

Contributor

I think this parameter should not be Nullable. If it is null, then we don't have to call this method.

This comment has been minimized.

@StefanRRichter

StefanRRichter Feb 22, 2018

Contributor

Already changed in my updated branch.

@Override
public KeyedStateHandle getIntersection(KeyGroupRange keyGroupRange) {
return keyGroupRange.getIntersection(keyGroupRange).getNumberOfKeyGroups() > 0 ? this : null;

This comment has been minimized.

@tillrohrmann

tillrohrmann Feb 21, 2018

Contributor

This looks like a bug. keyGroupRange.getIntersection(keyGroupRange) should be keyGroupRange. I think in general it is a good idea to not shadow local fields by function parameters.

This comment has been minimized.

@StefanRRichter
/** The filesystem that contains the directory described by path. */
@Nonnull
private final FileSystem fileSystem;

This comment has been minimized.

@tillrohrmann

tillrohrmann Feb 21, 2018

Contributor

FileSystem is not serializable and should be removed from the DirectoryStateHandle. directory should give you all you need.

This comment has been minimized.

@StefanRRichter

StefanRRichter Feb 22, 2018

Contributor

It is true, and just was no problem because this handle is never serialized, it just is forced to inherit this from StateObject. 👍

This comment has been minimized.

@StefanRRichter
/** The filesystem that contains the snapshot directory. */
@Nonnull
private final FileSystem fileSystem;

This comment has been minimized.

@tillrohrmann

tillrohrmann Feb 21, 2018

Contributor

Do we need FileSystem if we have directory which is of type Path?

This comment has been minimized.

@StefanRRichter

StefanRRichter Feb 22, 2018

Contributor

In this class, I would keep it because it is used in several operations and obtaining the FileSystem from Path looks like it might not be a cheap operation. So this is more like a cache, or in case that you already have the FileSystem when constructing the object.

}
public boolean isSnapshotOngoing() {
return State.ONGOING.equals(state.get());

This comment has been minimized.

@tillrohrmann

tillrohrmann Feb 21, 2018

Contributor

enum values should be compared using ==.

This comment has been minimized.

@StefanRRichter

StefanRRichter Feb 22, 2018

Contributor

I think this is only a question of personal style, there is no guideline I am aware of that says this. It basically does the same if equals is called on the constant.

This comment has been minimized.

@tillrohrmann

tillrohrmann Feb 22, 2018

Contributor

I actually think that == is strictly better than equals and thus should be used. It has the following advantages:

  • It never throws NullPointerExceptions
  • It gives you type checks at compile time
  • It is faster because equals simply defers to == internally

This comment has been minimized.

@StefanRRichter

StefanRRichter Feb 22, 2018

Contributor

I think:

  • This also never throws NPE if you call equals on the constant (that is why i emphasized this).
  • True.
    -Can be optimized to the same thing.

Pro points I see for the other variant:

  • does not make you think if == is correct in the comparison for even a second.
  • still works correctly after refactoring, e.g. when the type is changed from enum to String constants in case somebody missed to adjust the statement.

Anyways, I will not fight over such a minor thing and can just change it ;)

*/
public static SnapshotDirectory temporary(Path path) throws IOException {
return new SnapshotDirectory(path) {

This comment has been minimized.

@tillrohrmann

tillrohrmann Feb 21, 2018

Contributor

Not sure whether an anonymous class is strictly necessary here.

This comment has been minimized.

@StefanRRichter

StefanRRichter Feb 22, 2018

Contributor

I have changed that in my comments branch already. Made SnapshotDirectory itself abstract in method completeSnapshotAndGetHandle, with two implementations (temporary, permanent) that have their static factory methods each.

@tillrohrmann

Reviewed c77c97c

@tillrohrmann

Reviewed a4181dd

@tillrohrmann

Reviewed 22b53c3

@tillrohrmann

Reviewed b8d0c40

@tillrohrmann

Reviewed b8d0c40

@@ -89,6 +90,22 @@ public TaskExecutorLocalStateStoresManager(
}
}
}
// install a shutdown hook
this.shutdownHook = new Thread("TaskExecutorLocalStateStoresManager shutdown hook") {

This comment has been minimized.

@tillrohrmann

tillrohrmann Feb 22, 2018

Contributor

It might make sense to reuse BlobUtils#addShutdownHook or to refactor this and then reuse.

This comment has been minimized.

@StefanRRichter

StefanRRichter Feb 23, 2018

Contributor

I made a separate commit that introduces ShudownHookUtil. Used it to deduplicated the same code in like 20 places...

@@ -193,7 +193,12 @@ public static TaskManagerServicesConfiguration fromConfiguration(
}
final String[] tmpDirs = ConfigurationUtils.parseTempDirectories(configuration);
final String[] localStateRootDir = ConfigurationUtils.parseLocalStateDirectories(configuration);
String[] localStateRootDir = ConfigurationUtils.parseLocalStateDirectories(configuration);

This comment has been minimized.

@tillrohrmann

tillrohrmann Feb 22, 2018

Contributor

Could also be final

// we use local recovery when it is activated and we are not taking a savepoint.
return RocksDBStateBackend.LocalRecoveryMode.ENABLE_FILE_BASED.equals(recoveryMode)
return LocalRecoveryConfig.LocalRecoveryMode.ENABLE_FILE_BASED.equals(recoveryMode)

This comment has been minimized.

@tillrohrmann

tillrohrmann Feb 22, 2018

Contributor

I prefer for enum comparisons ==, but we had this discussion before ;-)

This comment has been minimized.

@StefanRRichter
}
}
throw new ParseException("Cannot parse input to LocalRecoveryMode: " + input);
}

This comment has been minimized.

@tillrohrmann

tillrohrmann Feb 22, 2018

Contributor

Why not simply using LocalRecoveryMode.valueOf()?

This comment has been minimized.

@StefanRRichter
/** The local recovery mode. */
@Nonnull
private final LocalRecoveryMode localRecoveryMode;

This comment has been minimized.

@tillrohrmann

tillrohrmann Feb 22, 2018

Contributor

This is more of a general question than a review comment: How will it look in the future if we want to support a non file based local recovery? E.g. writing some things to a local DB, for example. Then we would not need the LocalRecoveryDirectoryProvider and we would have to adapt how we create the duplicating streams. So I'm wondering whether we should not have something like a LocalStateStreamFactory instead. For file based recovery it could wrap the LocalRecoveryDirectoryProvider to select the next file to write into. What are the plans for that in the future?

This comment has been minimized.

@StefanRRichter

StefanRRichter Feb 23, 2018

Contributor

This is a bit tricky, I could see that sometimes the stream abstraction might not be enough for all (optimized) types of local snapshotting, and a directory is more powerful in that respect. For the time being, I think we should keep it like this and adjust if we have more concrete requirements that call for a different generalization.

@@ -46,9 +48,15 @@ public void testCreationFromConfig() throws Exception {
final Configuration config = new Configuration();
final String rootDirString = "localStateRoot1,localStateRoot2,localStateRoot3";
String tmpDir = System.getProperty("java.io.tmpdir") + File.separator;

This comment has been minimized.

@tillrohrmann

tillrohrmann Feb 22, 2018

Contributor

I think it's better to use a TemporaryFolder if there are any files created to ensure a proper clean up at the end of the test.

This comment has been minimized.

@StefanRRichter
new LocalRecoveryDirectoryProviderImpl(rootDirs, jobID, allocationID, jobVertexID, subtaskIdx);
LocalRecoveryConfig localRecoveryConfig =
new LocalRecoveryConfig(LocalRecoveryConfig.LocalRecoveryMode.DISABLED, directoryProvider);

This comment has been minimized.

@tillrohrmann

tillrohrmann Feb 22, 2018

Contributor

It's a bit strange that we have to pass in a directoryProvider, even though the LocalRecoveryMode is disabled. I think it would be better to have something like LocalRecoveryConfig.disabled() and LocalRecoveryConfig.fileBased(directoryProvider).

This comment has been minimized.

@StefanRRichter

StefanRRichter Feb 23, 2018

Contributor

So wouldn't that mean that with LocalRecoveryConfig.disabled(), the directory provider is again nullable? I think that LocalRecoveryDirectoryProvider is a very lightweight object, that also always has a useful default configuration.