Skip to content

[FLINK-9280][rest] Rework JobSubmitHandler to accept jar/artifact files#6147

Closed
zentol wants to merge 7 commits intoapache:masterfrom
zentol:9280
Closed

[FLINK-9280][rest] Rework JobSubmitHandler to accept jar/artifact files#6147
zentol wants to merge 7 commits intoapache:masterfrom
zentol:9280

Conversation

@zentol
Copy link
Copy Markdown
Contributor

@zentol zentol commented Jun 11, 2018

What is the purpose of the change

This PR reworks the JobSubmitHandler to also accept jar/artifact files. Previously these files had to be uploaded preemptively to the blob-service by the client. With this change the entire job submission goes through REST.

This PR addresses 3 JIRAs in total:

FLINK-9382
Directories given to the blob-service (primarily a use-case for the distributed cache) are currently silently zipped, and later unzipped by the FileCache. This tightly coupled the zipping logic in the blob-service to the unzipping logic of the FileCache. The blob-service neither unzipped the directory if the blob was requested, nor did it provide any means of doing so manually, nor did it inform the user as to whether the requested blob is a zip or not.

My conclusion in FLINK-9382 is that the blob-service should not support directories for now, and that instead directories for the distributed cache should be explicitly zipped beforehand, given that this is the only use-case we have at the moment.

This JIRA is related to FLINK-9280 as the zipping logic was necessary for the upload of directories from the client via REST. Since the server thus receives all artifacts already in zipped form we forward them in zipped form to the blob-service , making the blob-service support for directories obsolete.

The zipping is now done in JobGraph#uploadUserArtifacts with utilities provided by the FileCache class.
The unzipping is still done by the FileCache. Furthermore, we now no longer delete the zip after processing, as this file is managed by the blob-service.

FLINK-9500
In some cases (I don't know exactly when) it can happen that en empty LastHttpContent is sent at the end of a FileUpload. This currently leads to an exception in the FileUploadHandler when calling currentHttpPostRequestDecoder.hasNext().

The LastHttpContent message is fortunately a singleton, which allows us to easily check for it in the FileUploadHandler. If detected we skip the payload processing.
Note that we still offer this content to the encoder, as this part is still handled without exception and appears to follow an expected life-cycle.

This issue was also triggered by FLINK-9280, which now serves as verification for the fix.

FLINK-9280
This issue is addressed in 5 commits that must be squashed before a merge.

The commit Move channel setup into utility method is a simple refactoring to allow re-using code.
The commit Remove BlobServer port handler removes various classes related to requesting the blobserver port via REST, which is now obsolete.
The commit add new constructor for DCEntry adds another constructor to the DistributedCacheEntry class for setting the isZipped flag on the client-side. The documentation was also extended to cover the life-cycle of entries for directories.

The last 2 commits contain the actual implementation and are separated by client/server.

The following is an outline of the events after RestClusterClient#submitJob has been called:

  • directories registered for the distributed cache are zipped, and dc entries are updated using the newly added constructor
  • the jobgraph, jars and local artifacts (dc files) are sent to the Dispatcher by the RestClient as a multi-part request
    • the jobgraph is contained in a JobSubmitRequestBody and stored as an Attribute
    • each jar/artifact is stored as a separate FileUpload
  • the FileUploadHandler receives the request and stores the received parts in a JobSubmitRequestBodyBuffer. Once the request is fully read the buffer is converted into a proper JobSubmitRequestBody and passed to the rest of the pipeline as an attribute. In other words we inject the paths to uploaded jars/artifacts into the submitted JobSubmitRequestBody. Unfortunately we are also parsing the original json payload here, which ideally should be done by the handler for consistency.
  • the modified JobSubmitRequestBody is read in AbstractHandler#respondAsLeader, cast, and passed on in place of the original request
  • The JobSubmitHandler modifies the JobGraph to no longer refer to client-local jars/artifacts, in preparation for the job-submission. Jar entries are categorically overridden as jars are always uploaded. Artifacts are only overridden if an uploaded file exists, identified by the file name.
  • jars/artifacts are uploaded to the BlobService; this was previously done in the ClusterClient
  • job is submitted, as before

Brief change log

  • extend RestClient to support sending jobgraph, jars and artifacts as multipart http request
  • modify FileUploadHandler to handler job-submission specific multipart request
  • modify JobSubmitHandler to override jar/artifact entries pointing to client-local files to instead point to uploaded files
  • move jar/artifact blob-service upload logic from RestClusterClient to JobSubmitHandler

Verifying this change

FLINK-9382 is covered by added tests (see the relevant commit)a and the existing distributed-cache and python E2E tests.

FLINK-9500 is implicitly tested by FLINK-9280.

FLINK-9280:

  • job-submission as a whole is tested by existing E2E tests and RestClusterClientTest.
  • changes to the JobSubmitHandler are covered in JobSubmitHandlerTest.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (no)
  • If yes, how is the feature documented? (not documented)

Copy link
Copy Markdown
Contributor

@tillrohrmann tillrohrmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had some comments mainly concerning the code structure. I think it would be nice if we could keep the JobGraph free of the whole upload and zipping logic.

}
}

public static Path compressDirectory(Path directory) throws IOException {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we move these methods rather in the FileUtils class as generic convenience methods?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If so, then we should also pass the target path to this method.

} else {
Files.copy(zis, newFile);
//noinspection ResultOfMethodCallIgnored
newFile.toFile().setExecutable(isExecutable);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method violates the SRP by expanding the zip and setting file permissions. Might be easier to separate these steps (especially if we move these methods to FileUtils).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might also be problematic if we have a mixed directory which contains some executable and non-executable files.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, this is very much a hack.

We need a way to attach meta-data for every file (i.e. an executable flag); maybe a .metadata file in the zip that contains an entry for each file.

CopyFromBlobProcess(DistributedCacheEntry e, JobID jobID, PermanentBlobService blobService, Path target) throws Exception {
CopyFromBlobProcess(DistributedCacheEntry e, JobID jobID, PermanentBlobService blobService, File target) throws Exception {
this.isExecutable = e.isExecutable;
this.isDirectory = e.isZipped;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't normal files be also zipped (e.g. text files)?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

technically yes, but for the distributed cache we only zip directories. If a user registers a zipped file it is his responsibility to expand it.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we rename the DistributedCacheEntry#isZipped then to isDirectory?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about isZippedDirectory? isDirectory is misleading since the file is not actually a directory.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good

@@ -273,26 +269,8 @@ public Path call() throws IOException {
final File file = blobService.getFile(jobID, blobKey);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is a slight inconsistency with respect to the file type. If the file under blobKey is a file, then it is left in the ownership of the BlobService. If it is a zipped directory, then we expand the file under <tmp-dir>/tmp_<jobID>/ which is deleted 5 seconds after the last task holding a reference to it is released. This is not a deal breaker but we should at least update the java docs such that they state the difference.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have already raised this issue in FLINK-9382.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this commit intended to solve the problems of FLINK-9382 and, thus, should fix it?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does so partially, we now no longer delete the zip retrieved from the blobservice.

We could fix the ownership problem by copying simple files as well to the storage-directory, but that seems wasteful.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then I would suggest to update the JavaDoc stating that only zipped directories are copied.

}

public void uploadUserArtifacts(InetSocketAddress blobServerAddress, Configuration clientConfig) throws IOException {
zipUserArtifacts();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we say that all userArtifacts are already zipped if they are a directory before being added via addUserArtifact. This means that the caller is responsible for the zipping. That way we could get rid of modifying the JobGraph as a side effect of uploadUserArtifacts.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We would have to do this either right away in ExEnv#registerCachedFile or in ExEnv#registerCachedFilesWithPlan.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or in JobGraphGenerator and StreamingJobGraphGenerator.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know that part of the code, my guess was that we want to keep the transition from Plan -> JobGraph straight-forward.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be better to change these parts instead of pushing this logic into the JobGraph.

@@ -593,10 +596,37 @@ public void uploadUserArtifacts(InetSocketAddress blobServerAddress, Configurati
new DistributedCache.DistributedCacheEntry(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we maybe distinguish between a DistributedCacheEntry which represents an accessible file and a BlobServerStoredDistributedCacheEntry which knows the BlobKey under which the file is stored. This would make the whole upload and download business a bit clearer.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then isZipped would only be part of the BlobServerStoredDistributedCacheEntry.


assertTrue(Files.exists(extractDir.resolve(file1)));
assertFalse(Files.isDirectory(extractDir.resolve(file1)));
assertEquals(Files.size(compressDir.resolve(file1)), Files.size(extractDir.resolve(file1)));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we also test that the contents are equal?


FileCache.expandDirectory(new File(zip.getPath()), extractDir.toFile(), false);

assertTrue(Files.exists(extractDir.resolve(originalDir)));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could make the whole directory tree comparison more automated by following https://stackoverflow.com/a/39584230/4815083. What do you think?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll try to come up with something. The linked solution does not cover empty directories and does not detect additional files in one direction (and I'd prefer if we didn't have to run it twice with reverse arguments).

}

@Test
public void testCompression() throws IOException {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we move the zip and expand methods to FileUtils, then this test should be moved as well.

import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;

public class JobGraphTest {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could extend TestLogger

Copy link
Copy Markdown
Contributor

@tillrohrmann tillrohrmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand why we need to filter out the LastHttpContent.EMPTY_LAST_CONTENT. It would be great if you could give me some more context @zentol.

final HttpContent httpContent = (HttpContent) msg;
currentHttpPostRequestDecoder.offer(httpContent);

while (currentHttpPostRequestDecoder.hasNext()) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How exactly can this problem be reproduced? I don't fully understand why we need to filter out LastHttpContent, because it should be consumable by the currentHttpPostRequestDecoder. The only problem I can see from the code of the HttpPostMultipartRequestDecoder is if we are in state MultiPartStatus.EPILOGUE and then call hasNext.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The failure was triggered every-time i attempted to upload anything, but i may very well depend on the payload size.

Your analysis is mostly correct, what's missing is that decoder switches into the EPILOGUE state when being offered a LastHttpContent.
If this last message is not empty the exception is not thrown since data is still available, checked in hasNext via this.bodyListHttpDataRank >= this.bodyListHttpData.size().

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you create a test case which reproduces the problem? Running the JobSubmitHandlerTest without this change did not reproduce the problem. I would like to see whether it is indeed a Netty bug or if we are simply doing something wrong which we cover up with this fix.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The JobSubmitHandlerTest never transmits any files via netty. The failure would occur in the FileUploadHandler which is currently completely untested.

I'll try to find a test that can reproduce this

Copy link
Copy Markdown
Contributor

@tillrohrmann tillrohrmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo in commit message and a single minor comment.

executor);
}

private CompletableFuture<Channel> createChannelFuture(String targetAddress, int targetPort) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe rename to openConnection or connectTo.

Copy link
Copy Markdown
Contributor

@tillrohrmann tillrohrmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should refactor the DistributedCacheEntry. At the moment it serves too many too different purposes.

* Before the job is submitted to the cluster directories are zipped, at which point {@code filePath} denotes the path to the local zip.
* After the upload to the cluster, {@code filePath} denotes the (server-side) copy of the zip.
*/
public static class DistributedCacheEntry implements Serializable {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be out of scope of this PR but I think the DistributedCacheEntry mixes too many responsibilities. On the one hand it is used to transport cache entry information like isZipped, blobKey and isExecutable which is only relevant for the job submission. On the other hand, it also contains information about which files to transmit to the cluster at the job creation time. I think it would be a good idea to separate these responsibilities. As a side effect, we would not have nullable fields such as the blobKey in this class.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, you just added JavaDocs stating all the different purposes of this class. I really think that we should split this class up into dedicated classes.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a neat side effect, we could also refactor how this information is sent to the cluster, namely changing it such that it is no longer serialized into the Configuration.

Copy link
Copy Markdown
Contributor

@tillrohrmann tillrohrmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should change the RestClient a bit to reduce redundant code.

})
.collect(Collectors.toList());

return restClient.sendRequest(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add retries for this call by adding a sendRetriableRequest(...) with correct signature. Even better would be to add a sendRequest(...) with the correct signature which dispatches to sendRetriableRequest.

.thenCompose(webMonitorBaseUrl -> {
try {
return sendRequest(
jobGraph.zipUserArtifacts();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think zipping should not be the responsibility of the JobGraph.

@@ -25,4 +25,6 @@
*/
public class RestConstants {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be an enum. That way we would get all the nice singleton properties for free.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, goot point


bodyRequestEncoder.finalizeRequest();
} catch (HttpPostRequestEncoder.ErrorDataEncoderException e) {
return org.apache.flink.runtime.concurrent.FutureUtils.completedExceptionally(e);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Could import FutureUtils.

// takes care of splitting the request into multiple parts
HttpPostRequestEncoder bodyRequestEncoder;
try {
bodyRequestEncoder = new HttpPostRequestEncoder(factory, httpRequest, true);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Must it be strictly a multi-part request?

addPathsToEncoder(jars, FileUploadHandler.HTTP_ATTRIBUTE_JARS, RestConstants.JAR_CONTENT_TYPE, bodyRequestEncoder);
addPathsToEncoder(userArtifacts, FileUploadHandler.HTTP_ATTRIBUTE_ARTIFACTS, RestConstants.BINARY_CONTENT_TYPE, bodyRequestEncoder);

bodyRequestEncoder.finalizeRequest();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's not a multi-part request, then we should send the HttpRequest which is returned here.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can always send the request that the encoder returns.

executor)
.thenComposeAsync(
(JsonResponse rawResponse) -> parseResponse(rawResponse, objectMapper.constructType(messageHeaders.getResponseClass())),
executor);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not necessary to specify again executor. Calling thenCompose is enough.

executor);
}

public <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture<P> sendRequest(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method and the other sendRequest method contain a lot of duplicate code. Can we simplify this by having a sendRequestInternal method which takes a RequestBodyProvider which can be for POST, PUT and OPTIONS be a HttpPostRequestEncoder and for all other verbs a VoidBodyProvider?

U messageParameters,
R request,
Collection<Path> jars,
Collection<Path> userArtifacts) throws IOException {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be great if the RestClient did not know about the distinction between jars and userArtifacts. Instead it should be enough to provide this method a collection of FileUpload objects which contain the path and some meta information to make sense of the different files contained in the body on the receiving side.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So far we managed to not expose netty stuff in the RestClient API, I would prefer if we didn't start now.

Could we not just pass a single Collection<Path> instead? The content-type would then always be application/octet-stream.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't mean to use Netty's FileUpload class but creating our own where we could specify the content type and other information. But we can begin also with Collection<Path> and encode what kind of file it is in the file name, for example.

bodyRequestEncoder.addBodyHttpData(requestAttribute);

addPathsToEncoder(jars, FileUploadHandler.HTTP_ATTRIBUTE_JARS, RestConstants.JAR_CONTENT_TYPE, bodyRequestEncoder);
addPathsToEncoder(userArtifacts, FileUploadHandler.HTTP_ATTRIBUTE_ARTIFACTS, RestConstants.BINARY_CONTENT_TYPE, bodyRequestEncoder);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we can send arbitrary files to the server and let the respective handler make sense of what is in what file, then we would also not need to introduce the different attributes.

Copy link
Copy Markdown
Contributor

@tillrohrmann tillrohrmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should try to generalize the FileUploadHandler instead of baking in a dependency on the JobSubmitHandler with its specific types. Moreover, we should add tests for the FileUploadHandler verifying that it does what we want it to do.

}
}

private static String getDispatcherHost(DispatcherGateway gateway) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's call it getDispatcherHostname

// if the dispatcher address does not contain a host part, then assume it's running
// on the same machine as the handler
return "localhost";
});
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this could be replaced by gateway.getHostname().

/**
* Updates the jar entries in the given JobGraph to refer to the uploaded jar files instead of client-local files.
*/
private static void updateJarEntriesInJobGraph(JobGraph jobGraph, Collection<Path> uploadedJars, Logger log) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to update the jar file names in the JobGraph? I thought JobGraph#userJars is only used by the client to learn which jars to upload to the cluster.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

correct, this field is used in JobGraph#uploadUserJars to upload the jars to the blob-server. Since this is now done on the server but the original entries still point to client-local files we have to update the entries.

If we move the upload out of the jobgraph we can skip this step.

final List<PermanentBlobKey> keys;
try {
keys = BlobClient.uploadFiles(address, config, jobGraph.getJobID(), jobGraph.getUserJars());
jobGraph.uploadUserArtifacts(address, config);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of calling updateUserArtifactEntriesInJobGraph and then jobGraph.uploadUserArtifacts we could simply take requestBody.getUploadedArtifacts upload them to the BlobServer and add the blob keys to the JobGraph such that it knows where to retrieve the user artifacts from.

jobGraph.uploadUserArtifacts(address, config);
} catch (IOException ioe) {
log.error("Could not upload job jar files.", ioe);
throw new CompletionException(new RestHandlerException("Could not upload job jar files.", HttpResponseStatus.INTERNAL_SERVER_ERROR));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could add the cause ioe to the RestHandlerException.

final byte[] requestJson = request.get();
JobSubmitRequestBody jobSubmitRequestBody = RestMapperUtils.getStrictObjectMapper().readValue(requestJson, JobSubmitHeaders.getInstance().getRequestClass());
currentJobSubmitRequestBuffer.setJobGraph(jobSubmitRequestBody.serializedJobGraph);
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we are mixing here a lot of handler specific knowledge into this handler and thereby creating a very strong coupling between multiple components. Moreover, this handler seems to deserialize json which is rather the responsibility of the AbstractHandler.

"_" + fileUpload.getFilename()));
fileUpload.renameTo(dest.toFile());
ctx.channel().attr(UPLOADED_FILE).set(dest);
if (currentHttpRequest.getUri().equals(JobSubmitHeaders.getInstance().getTargetRestEndpointURL())) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the FileUploadHandler should not know about the JobSubmitHandler. Instead it should only be responsible for receiving uploaded files, storing them in a temp directory and then making them accessible to a downstream handler (e.g. through an Attribute in the AttributeMap). In order to defer the deserialization of the Json part of the payload, we could create a new HttpRequest which contains exactly the data sent as a MemoryAttribute (the branch which matches InterfaceHttpData.HttpDataType.Attribute).

if (httpContent instanceof LastHttpContent) {
if (currentJobSubmitRequestBuffer != null) {
ctx.channel().attr(SUBMITTED_JOB).set(currentJobSubmitRequestBuffer.get());
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of setting the SUBMITTED_JOB attribute I think it could be enough to set the set of uploaded files as an attribute and then send the json payload to the downstream handler (AbstractHandler). Then we would not need to construct the JobSubmitRequestBodyBuffer as an intermediate helper structure.

} else if (untypedResponseMessageHeaders == JobSubmitHeaders.getInstance()) {
final JobSubmitRequestBody jobSubmission = ctx.channel().attr(FileUploadHandler.SUBMITTED_JOB).get();
//noinspection unchecked
request = (R) jobSubmission;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure whether I would make the job submission a special case here. What if other requests will allow in the future to upload files as well. Alternatively, we could make the attribute map or the set of uploaded files accessible to the AbstractRestHandler implementations. Then every handler could implement the support for uploaded files themselves. What do you think?

Assert.assertEquals(1, userJars.size());

// this entry should be changed, a replacement jar exists in the server storage directory
Assert.assertEquals(new org.apache.flink.core.fs.Path(serverStorageDirectory.resolve(jar).toUri()), userJars.get(0));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think updating JobGraph#userJars and JobGraph#userArtifacts is not really necessary. Maybe we should even mark them transient in order to emphasize that they won't be transmitted. Given that, I think we don't have to do these tests.

@zentol
Copy link
Copy Markdown
Contributor Author

zentol commented Jun 19, 2018

I will split this PR to address the various issues separately.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants