New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-9624][runtime] Move jar/artifact upload out of jobgraph #6199
Conversation
import static org.junit.Assert.assertEquals; | ||
|
||
/** | ||
* TODO: add javadoc. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
missing javadoc
uploadViaBlob(blobServerAddress, blobClientConfig, uploadToBlobServer); | ||
|
||
for (Map.Entry<String, DistributedCache.DistributedCacheEntry> userArtifact : distributeViaDFS) { | ||
public void finalizeUserArtifactEntries() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
missing test
for (PermanentBlobKey key : keys) { | ||
jobGraph.addUserJarBlobKey(key); | ||
List<Path> userJars = jobGraph.getUserJars(); | ||
Map<String, DistributedCache.DistributedCacheEntry> userArtifacts = jobGraph.getUserArtifacts(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this entire block is effectively duplicated in several classes and could also be moved to ClientUtils
, but I wasn't sure whether this wouldn't put too much logic into a single method,
* @param blobClient client to upload jars with | ||
* @throws IOException if the upload fails | ||
*/ | ||
public static void uploadAndSetUserJars(JobGraph jobGraph, BlobClient blobClient) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
JarRunHandler
could use this method as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, let's do it then.
setUserArtifactBlobKeys(jobGraph, blobKeys); | ||
} | ||
|
||
private static Collection<Tuple2<String, PermanentBlobKey>> uploadUserArtifacts(JobID jobID, Map<String, DistributedCache.DistributedCacheEntry> userArtifacts, BlobClient blobClient) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Signature could be changed to accept a Map<String, Path> instead.
For consistency of in- and output we could also pass this as a Collection<Tuple2>
.
List<Path> userJars = jobGraph.getUserJars(); | ||
Map<String, DistributedCache.DistributedCacheEntry> userArtifacts = jobGraph.getUserArtifacts(); | ||
if (!userJars.isEmpty() || !userArtifacts.isEmpty()) { | ||
try (BlobClient client = new BlobClient(address, flinkConfig)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
alternatively we could refactor the try-with-resource statement and exception handling into a method that accepts a function, which would be used like this:
ClientUtils.withBlobClient(address, flinkConfig, () -> {
log.info("Uploading jar files.");
ClientUtils.uploadAndSetUserJars(jobGraph, client);
log.info("Uploading jar artifacts.");
ClientUtils.uploadAndSetUserArtifacts(jobGraph, client);
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would be in favour of having a ClientUtils#uploadJobGraphFiles(jobGraph, flinkConfig, Supplier<BlobClient>)
which basically does what's being done here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changes look good to me @zentol. I think it would be a good idea to remove the code redundancy by introducing a ClientUtils#uploadJobGraphFiles
method which encapsulates the logic. Moreover, one could get rid of writing the user artifacts into the job configuration which would avoid the two phase user artifact upload procedure which harder to maintain. What do you think?
Path path = new Path(userArtifact.getValue().filePath); | ||
// only upload local files | ||
if (!path.getFileSystem().isDistributedFS()) { | ||
final PermanentBlobKey blobKey = blobClient.uploadFile(jobID, new Path(userArtifact.getValue().filePath)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we could reuse path
here
List<Path> userJars = jobGraph.getUserJars(); | ||
Map<String, DistributedCache.DistributedCacheEntry> userArtifacts = jobGraph.getUserArtifacts(); | ||
if (!userJars.isEmpty() || !userArtifacts.isEmpty()) { | ||
try (BlobClient client = new BlobClient(address, flinkConfig)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would be in favour of having a ClientUtils#uploadJobGraphFiles(jobGraph, flinkConfig, Supplier<BlobClient>)
which basically does what's being done here.
* @param blobClient client to upload jars with | ||
* @throws IOException if the upload fails | ||
*/ | ||
public static void uploadAndSetUserJars(JobGraph jobGraph, BlobClient blobClient) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, let's do it then.
try { | ||
serializedBlobKey = InstantiationUtil.serializeObject(blobKey); | ||
} catch (IOException e) { | ||
throw new FlinkRuntimeException("Could not serialize blobkey " + blobKey + ".", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would not throw a FlinkRuntimeException
here. Instead we could led the IOException
bubble up.
uploadViaBlob(blobServerAddress, blobClientConfig, uploadToBlobServer); | ||
|
||
for (Map.Entry<String, DistributedCache.DistributedCacheEntry> userArtifact : distributeViaDFS) { | ||
public void finalizeUserArtifactEntries() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe rename to writeUserArtifactEntriesToConfiguration
uploadViaBlob(blobServerAddress, blobClientConfig, uploadToBlobServer); | ||
|
||
for (Map.Entry<String, DistributedCache.DistributedCacheEntry> userArtifact : distributeViaDFS) { | ||
public void finalizeUserArtifactEntries() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we would not need this method if we don't write the DistributedCacheEntries
into the configuration. If I'm not mistaken, then we send the userArtifacts
map anyway to the cluster. The things which are missing are: Addind a serial version UID to the DistributedCacheEntry
, and adding the userArtifacts
to the TaskDeploymentDescriptor
to send them to the TaskManager
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that this would nice, but I think that this is out of scope of this PR as we would have to touch an entirely new set of classes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alright, please create a follow up JIRA issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
||
assertEquals(jars.size(), jobGraph.getUserJars().size()); | ||
assertEquals(jars.size(), jobGraph.getUserJarBlobKeys().size()); | ||
assertEquals(jars.size(), jobGraph.getUserJarBlobKeys().stream().distinct().count()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Assert that we find the blob keys in the blob upload directory.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will use blobServer.getFile()
instead. to verify the validity of the blob keys
// 1 unique key for each local artifact, and null for distributed artifacts | ||
assertEquals(localArtifacts.size() + 1, jobGraph.getUserArtifacts().values().stream().map(entry -> entry.blobKey).distinct().count()); | ||
for (DistributedCache.DistributedCacheEntry original : localArtifacts) { | ||
assertState(original, jobGraph.getUserArtifacts().get(original.filePath), false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Assert that the blobs can be found in the blob server storage directory.
Test failure seems to be unrelated. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tillrohrmann I believe I've addressed all comments.
try { | ||
keys = BlobClient.uploadFiles(address, configuration, jobGraph.getJobID(), jobGraph.getUserJars()); | ||
try (BlobClient blobClient = new BlobClient(address, configuration)) { | ||
ClientUtils.uploadAndSetUserJars(jobGraph, blobClient); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we could use uploadJobGraphFiles
here, but there isn't really a use-case for uploading distributed cache artifacts when going through the JarRunHandler, since we're already on the server here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But it would reduce code redundancy, right? If this is the case, then let's do it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The JarRunHandler
now also uses uploadJobGraphFiles()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice. +1 for merging.
merging. |
What is the purpose of the change
This PR moves the logic for uploading jars/artifacts from the jobgraph into a separate utility class usable by all submission methods.
The new
ClientUtils
class exposes 2 methods for uploading jars/artifacts and setting the respective blob keys on theJobGraph
.All existing job-submission method were updated to use the new utilities and should now behave the same.
The subsumed methods in
JobGraph
were removed, but remnants of them remain in 2 added methods:ExecutionConfig
We could also do the latter in the JobManager when assembling the
TaskDeploymentDescriptor
; in any case we can now just shuffle this method around to where we want it.Verifying this change
ClientUtils
is tested inClientUtilsTest
JobGraph
changes are covered inJobGraphTest