Skip to content

Conversation

@tillrohrmann
Copy link
Contributor

What is the purpose of the change

Introduce the AbstractHandler which takes a typed request and returns an untyped
response. The AbstractRestHandler extends the AbstractHandler to add typed reponses.

Introduce AbstractTaskManagerFileHandler which encapsulates the file loading logic.
Upon request of a TaskManager file, the handler will trigger the file upload via
the ResourceManager. The returned TransientBlobKey is then downloaded via the
TransientBlobService. Once downloaded, the file is served to the client. Each
transient blob key is cached for maximum duration after which it is purged and has
to be reuploaded by the TaskExecutor.

This PR is based on #5341

Brief change log

  • Introduced untyped response handler AbstractHandler
  • Added AbstractTaskManagerFileHandler which is responsible for serving files from the TaskExecutor
  • The AbstractTaskManagerFileHandler triggers the file upload via the ResourceManager which knows the TaskExecutors, additionally it caches the TransientBlobKeys in order to not always trigger a file upload
  • Added TaskManagerLogFileHandler to serve the log file
  • Added TaskManagerStdoutFileHandler to serve the stdout file

Verifying this change

  • Added AbstractTaskManagerFileHandlerTest
  • Tested functionality manually

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (no)
  • If yes, how is the feature documented? (not applicable)

@tillrohrmann tillrohrmann force-pushed the enableTaskManagerLogRetrieval branch 5 times, most recently from 0b66d69 to dac8d9b Compare January 29, 2018 10:49
* @param timeout for the asynchronous operation
* @return Future which is completed with the {@link TransientBlobKey} of the uploaded file.
*/
CompletableFuture<TransientBlobKey> requestFileUpload(FileType fileType, @RpcTimeout Time timeout);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we need to clean up these methods?

	@Override
	public CompletableFuture<TransientBlobKey> requestTaskManagerLog(Time timeout) {
//		return taskExecutorGateway.requestTaskManagerLog(timeout);
		throw new UnsupportedOperationException("Operation is not yet supported.");
	}

	@Override
	public CompletableFuture<TransientBlobKey> requestTaskManagerStdout(Time timeout) {
//		return taskExecutorGateway.requestTaskManagerStdout(timeout);
		throw new UnsupportedOperationException("Operation is not yet supported.");
	}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we should adapt RpcTaskManagerGateway accordingly.

config,
handlerConfig,
resourceManagerGatewayRetriever,
transientBlobService,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need these static final variables. Can just use NoOpTransientBlobService.INSTANCE as a constructor argument.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, will change it.

final InetSocketAddress currentServerAddress = serverAddress;

if (currentServerAddress != null) {
return new BlobClient(serverAddress, blobClientConfig);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you should use currentServerAddress, or the creation can fail with an NPE.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely. Good catch!

(Void ignored, Throwable throwable) -> {
if (throwable != null) {
log.debug("Failed to transfer file from TaskExecutor {}.", taskManagerId, throwable);
fileBlobKeys.invalidate(taskManagerId);
Copy link
Member

@GJL GJL Feb 5, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not enough to invalidate the cache, you must also send an error back to the client, or the HTTP request cannot be finished.

This will block forever if the TM is not registered:

curl -v http://localhost:9067/taskmanagers/daecac46c3f0f13b945fd2bb94438204/log

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is true. Will add an error response here.

@tillrohrmann tillrohrmann force-pushed the enableTaskManagerLogRetrieval branch from 4a27bd1 to 1be552e Compare February 6, 2018 10:06
tillrohrmann added a commit to tillrohrmann/flink that referenced this pull request Feb 6, 2018
…b ui

Introduce the AbstractHandler which takes a typed request and returns an untyped
response. The AbstractRestHandler extends the AbstractHandler to add typed reponses.

Introduce AbstractTaskManagerFileHandler which encapsulates the file loading logic.
Upon request of a TaskManager file, the handler will trigger the file upload via
the ResourceManager. The returned TransientBlobKey is then downloaded via the
TransientBlobService. Once downloaded, the file is served to the client. Each
transient blob key is cached for maximum duration after which it is purged and has
to be reuploaded by the TaskExecutor.

This closes apache#5353.
@tillrohrmann
Copy link
Contributor Author

Thanks for the review @GJL. I've addressed your comments in ca17896 and rebased onto the latest master.

Instead of creating for each new JobManagerConnection a dedicated BlobCacheService
the TaskExecutor uses a single BlobCacheService which it shares between the
different JobManagerConnections. The initial BlobServer address is passed by the
ResourceManager when the TaskExecutor registers at it. In order to avoid the re-
creation of BlobCacheServices, this commit changes the behaviour such that one can
update the BlobServer address.

This closes apache#5350.
The JobManagerRunnerMockTest is completely ignored. Moreover, it tests things with
heavy usage of Mockito which is hard to maintain.
This commit removes the LibraryCacheManager from the JobMaster since it is
no longer needed. The JobMaster is started with the correct user code class
loader and, thus, does not need the LibraryCacheManager.

This commit also corrects that the BlobServer is not closed by the
JobManagerServices#shutdown method.

This closes apache#5352.
…b ui

Introduce the AbstractHandler which takes a typed request and returns an untyped
response. The AbstractRestHandler extends the AbstractHandler to add typed reponses.

Introduce AbstractTaskManagerFileHandler which encapsulates the file loading logic.
Upon request of a TaskManager file, the handler will trigger the file upload via
the ResourceManager. The returned TransientBlobKey is then downloaded via the
TransientBlobService. Once downloaded, the file is served to the client. Each
transient blob key is cached for maximum duration after which it is purged and has
to be reuploaded by the TaskExecutor.

This closes apache#5353.
@tillrohrmann tillrohrmann force-pushed the enableTaskManagerLogRetrieval branch from 1be552e to be8feb8 Compare February 6, 2018 11:15
@asfgit asfgit closed this in 5f57380 Feb 6, 2018
@tillrohrmann tillrohrmann deleted the enableTaskManagerLogRetrieval branch February 13, 2018 09:10
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants