New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-4778] add option to flink job server to clean staged artifacts per-job #5958
Conversation
1407f70
to
a60b9b5
Compare
Looking over this again, I'm going to revert the bits that make |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for taking this up.
Added a few comments.
Please add some test cases for the this.
private final JobInvoker invoker; | ||
|
||
private InMemoryJobService( | ||
Endpoints.ApiServiceDescriptor stagingServiceDescriptor, | ||
Function<String, String> stagingServiceTokenProvider, | ||
ThrowingConsumer<String> cleanupJobFn, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets make it a jobTerminationListener
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
state.equals(JobState.Enum.DONE) || state.equals(JobState.Enum.FAILED)) { | ||
String stagingSessionToken = stagingSessionTokens.get(preparationId); | ||
try { | ||
cleanupJobFn.accept(stagingSessionToken); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given its job Termination/Cleanup listener, we should be passing the job id or job object.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -70,26 +71,43 @@ | |||
public static InMemoryJobService create( | |||
Endpoints.ApiServiceDescriptor stagingServiceDescriptor, | |||
Function<String, String> stagingServiceTokenProvider, | |||
ThrowingConsumer<String> cleanupJobFn, | |||
Boolean cleanArtifactsPerJob, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need the boolean. We can simply set the jobTerminationListener in previous line or not set it.
Alternatively the jobTerminationListener can have an internal check for this boolean.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
} | ||
|
||
private final ConcurrentMap<String, JobPreparation> preparations; | ||
private final ConcurrentMap<String, JobInvocation> invocations; | ||
private final ConcurrentMap<String, String> stagingSessionTokens; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should also cleanup the stagingSessionTokens on job completion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok; I've made it so that we always clean them up on job termination, regardless of whether a cleanup hook is set.
my understanding is that we have no plans to use them beyond a job's life-span, so this falls under "not leaking memory from this map".
@@ -305,7 +311,7 @@ public void onCompleted() { | |||
* Serializable StagingSessionToken used to stage files with {@link | |||
* BeamFileSystemArtifactStagingService}. | |||
*/ | |||
private static class StagingSessionToken implements Serializable { | |||
static class StagingSessionToken implements Serializable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should keep the StagingSessionToken private as no one else should care about its structure outside this class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
return loadManifest(manifestResourceId); | ||
} | ||
|
||
public static ProxyManifest loadManifest(ResourceId manifestResourceId) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets keep it package private as BeamFileSystemArtifactRetricalService is the only other user.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
LOG.info("Removing manifest: {}", manifestResourceId); | ||
FileSystems.delete(Collections.singletonList(manifestResourceId)); | ||
LOG.info("Removing empty dir: {}", dir); | ||
FileSystems.delete(Collections.singletonList(dir)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should check if the directory is empty or not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it's not, currently we'll get a DirectoryNotEmptyException
, which seems basically desirable?
afaict FileSystems
offers no way to recursively delete, cf. BEAM-4843
for now, it seems like throwing an exception if the structure has changed from what is assumed here is the correct behavior, to me
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DirectoryNotEmptyException is good enough.
fcd8069
to
7ca46ab
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I responded to everything.
I added test logic to BeamFileSystemArtifactServicesTest
and InMemoryJobServiceTest
that covers most of what's new here.
However, some of the plumbing for the directory-removal callback happens in FlinkJobServerDriver
which doesn't have its own test atm.
I looked at adding one but it feels like it gets into IT / VR test territory a bit, or would want a TestPortableRunner
, so I held off on that for now, but if you'd like me to forge ahead with one of those approaches lmk!
return loadManifest(manifestResourceId); | ||
} | ||
|
||
public static ProxyManifest loadManifest(ResourceId manifestResourceId) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
LOG.info("Removing manifest: {}", manifestResourceId); | ||
FileSystems.delete(Collections.singletonList(manifestResourceId)); | ||
LOG.info("Removing empty dir: {}", dir); | ||
FileSystems.delete(Collections.singletonList(dir)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it's not, currently we'll get a DirectoryNotEmptyException
, which seems basically desirable?
afaict FileSystems
offers no way to recursively delete, cf. BEAM-4843
for now, it seems like throwing an exception if the structure has changed from what is assumed here is the correct behavior, to me
} | ||
|
||
private final ConcurrentMap<String, JobPreparation> preparations; | ||
private final ConcurrentMap<String, JobInvocation> invocations; | ||
private final ConcurrentMap<String, String> stagingSessionTokens; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok; I've made it so that we always clean them up on job termination, regardless of whether a cleanup hook is set.
my understanding is that we have no plans to use them beyond a job's life-span, so this falls under "not leaking memory from this map".
@@ -70,26 +71,43 @@ | |||
public static InMemoryJobService create( | |||
Endpoints.ApiServiceDescriptor stagingServiceDescriptor, | |||
Function<String, String> stagingServiceTokenProvider, | |||
ThrowingConsumer<String> cleanupJobFn, | |||
Boolean cleanArtifactsPerJob, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
state.equals(JobState.Enum.DONE) || state.equals(JobState.Enum.FAILED)) { | ||
String stagingSessionToken = stagingSessionTokens.get(preparationId); | ||
try { | ||
cleanupJobFn.accept(stagingSessionToken); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
private final JobInvoker invoker; | ||
|
||
private InMemoryJobService( | ||
Endpoints.ApiServiceDescriptor stagingServiceDescriptor, | ||
Function<String, String> stagingServiceTokenProvider, | ||
ThrowingConsumer<String> cleanupJobFn, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -305,7 +311,7 @@ public void onCompleted() { | |||
* Serializable StagingSessionToken used to stage files with {@link | |||
* BeamFileSystemArtifactStagingService}. | |||
*/ | |||
private static class StagingSessionToken implements Serializable { | |||
static class StagingSessionToken implements Serializable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
7ca46ab
to
d3c9465
Compare
Run Java PreCommit |
First Jenkins build failed with a lot of messages like:
and eventually:
Now I'm seeing the first message in the latest build as well. Any ideas what that's about are welcome! |
Run Java PreCommit |
Same thing 3x here, not sure how to triage. Seeing a timed out build on #6018 with a lot of OOMs but not the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
configuration.cleanArtifactsPerJob ? | ||
(String stagingSessionToken) -> | ||
artifactStagingService.getService().removeArtifacts(stagingSessionToken) | ||
: null, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will throw NullPointer on accept call if configuration.cleanArtifactsPerJob is false.
I was suggesting:
(String stagingSessionToken) -> if(configuration.cleanArtifactsPerJob) artifactStagingService.getService().removeArtifacts(stagingSessionToken)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will throw NullPointer on accept call if configuration.cleanArtifactsPerJob is false.
I don't think that's true, as written? I check cleanupJobFn != null
in the only place it's referenced in InMemoryJobService
.
Or maybe I'm missing what you mean?
Anyway, if you think this way is cleaner I'm almost fine with it, lmk!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed it to what you suggested, anyway
LOG.info("Removing manifest: {}", manifestResourceId); | ||
FileSystems.delete(Collections.singletonList(manifestResourceId)); | ||
LOG.info("Removing empty dir: {}", dir); | ||
FileSystems.delete(Collections.singletonList(dir)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DirectoryNotEmptyException is good enough.
Merged in |
whew, finally passed precommit. lmk if you want me to squash+rebase @angoenka (or if there are any other comments I missed! I think I got everything) |
The PR looks good. |
@ryan-williams Is the PR good for merge? |
aae17aa
to
ebd733d
Compare
I removed the terminationListener per your suggestion @angoenka; it should be ready to go! |
try { | ||
return MAPPER.writeValueAsString(this); | ||
} catch (JsonProcessingException e) { | ||
LOG.error("Error {} occurred while serializing {}.", e.getMessage(), this); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why logging vs. using StatusRuntimeException
as shown below?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch, done
} catch (JsonProcessingException e) { | ||
LOG.error("Error {} occurred while serializing {}.", e.getMessage(), stagingSessionToken); | ||
throw e; | ||
LOG.info("Removing dir {}", dir); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really this and following logging at info level? Why not debug?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point; I made just the last one info
and the rest debug
, lmk if you want them all as debug
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks @tweise, I think I addressed both comments (and hopefully the CI issue)
} catch (JsonProcessingException e) { | ||
LOG.error("Error {} occurred while serializing {}.", e.getMessage(), stagingSessionToken); | ||
throw e; | ||
LOG.info("Removing dir {}", dir); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point; I made just the last one info
and the rest debug
, lmk if you want them all as debug
try { | ||
return MAPPER.writeValueAsString(this); | ||
} catch (JsonProcessingException e) { | ||
LOG.error("Error {} occurred while serializing {}.", e.getMessage(), this); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch, done
Thanks for the spotless fix; I was mistakenly only running it on I see another spotless nit locally, will push it if this build fails (which it presumably will?) |
1d4a880
to
3711dbf
Compare
3711dbf
to
9c7130e
Compare
FYI this can be avoided by running |
Run Java Precommit |
when set, the
InMemoryJobService
subscribes to invoked jobs' state-changes, and when it sees them complete, removes all associated artifactsA few incidental moves/changes:
BeamFileSystemArtifactRetrievalService.loadManifest
publiclyStagingSessionToken
JSON serde into methodsR: @angoenka
Post-Commit Tests Status (on master branch)