Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-5820] [state backends] Split shared/exclusive state and properly handle disposal #5396

Closed
wants to merge 19 commits into from

Conversation

StephanEwen
Copy link
Contributor

@StephanEwen StephanEwen commented Jan 31, 2018

What is the purpose of the change

This PR contains the final changes needed for [FLINK-5820]. Disposal of checkpoint directories happens properly across all file system types (previously did not work properly for some S3 connectors) with reduced calls to the file systems. Shared and exclusive state are split into different directories, to help implement cleanup safety nets.

Brief change log

  1. TaskManagers use the CheckpointStorage to create CheckpointStreamFactories. Previously, these stream factories were created by the StateBackend. This completes the separating out the "storage" aspect of the StateBackend into the CheckpointStorage.

  2. The location where to store state is communicated between the CheckpointCoordinator (instantiating the original CheckpointStorageLocation for a checkpoint/savepoint) and the Tasks in a unified manner. Tasks transparently obtain their CheckpointStreamFactories always in the same way, regardless of whether writing state for checkpoints or savepoints.

  3. Checkpoint state now has the scope EXCLUSIVE or SHARED, which may be stored differently. The current file system based backends put shared state into a /shared directory, while exclusive state goes into the /chk-1234 directory.

  4. Tasks can directly write task-owned state to a checkpoint storage. That state neither belongs specifically to one checkpoint, nor is it shared and eventually released by the Checkpoint Coordinator. Only the tasks themselves may release the state. An example for that type of state are the write ahead logs created by some sinks.

  5. When a checkpoint is finalized, its storage is described by a CompletedCheckpointStorageLocation. That object gives access to addressing, metadata, and handles location disposal. This allows us to drop the "delete parent if empty" logic in File State Handles and fixes the issue that checkpoint directories are currently left over on S3.

Future Work

  • In the future, the CompletedCheckpointStorageLocation should also be used as a way to handle relative addressing of checkpoints, to allow users to move them to different directories without breaking the internal paths.

  • We can now implement disposal fast paths, like drop directory as a whole, rather than dropping each state object separately. However, one would still need to release drop shared state objects individually. Finishing these fast paths is currently blocked on some rework of the shared state handles, to make their selective release easier and more robust.

Verifying this change

This change can be verified by running a Flink cluster with a checkpointed program and

This PR also adds and adjusts various unit tests to guard the new behavior.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? Somewhat (it changes the state backend directory layouts)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

Because all checkpoints are now externalized (write their metadata) this is an obsolete test.
…onReference instead of String to communicate the location
…tory from CheckpointStorage and Checkpoint Location Reference to persist checkpoint data.
… Storage and Checkpoint Stream to separate tests suites
…orageLocation to properly handle disposal of checkpoints.

That concept allows us to properly handle deletion of a checkpoint storage, for example deleting checkpoint
directories, or the dropping of a checkpoint specific table.

This replaces the current workaround for file systems, where every file disposal checks if the parent
directory is now empty, and deletes it if that is the case. That is not only inefficient, but
prohibitively expensive on some systems, like Amazon S3.
…am to FsCheckpointMetadataOutputStream

The new name captures the proper use and meaning of the class in a better way.
…ean up their parent directory.

Performing directory contents checks and cleaning up the parent directory in the state handle disposal
has previously led to excessive file system metadata requests, which especially on systems like
Amazon S3 is prohibitively expensive.
Copy link
Contributor

@aljoscha aljoscha left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are excellent changes! 👍

I had some comments and questions inline.

/**
* The configuration of a checkpoint, such as whether
* The configuration of a checkpoint. This described whether
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: type


private final boolean forced;
/** Type - checkpoit / savepoint. */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: typo

File file = tmpFolder.newFile();
assertEquals(true, file.exists());

public void registerStatesAtRegistry() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the reason for this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test whether state handles are correctly registered at the SharedStateRegistry was originally just sneakily added to a pre-existing metadata file cleanup test. That did not seem right ;-)

This factors the test out into a separate method. The test method should be called testRegisterStatesAtRegistry instead of registerStatesAtRegistry. Will change that...


stream2.close();

verify(fs, times(0)).delete(any(Path.class), anyBoolean());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: This seems somewhat brittle because there could be another "delete" method that the handle uses to delete the parent dir. For "future proof-ness"...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will add an additional check that the directory still exists

@StephanEwen
Copy link
Contributor Author

Merged in 31e97e5

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants