-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix Race in jar upload during hadoop indexing #1815
Conversation
uploadJar(jarFile, intermediateHdfsPath, fs); | ||
try { | ||
log.info("Renaming jar to path[%s]", hdfsPath); | ||
fs.rename(intermediateHdfsPath, hdfsPath); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this races can it leave intermediateHdfsPath stale in the distributed FS?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the intermediate path is in job working directory which gets cleaned up during job cleanup.
Also, fixed it to clean the intermediate jar file here.
729c33c
to
bf54991
Compare
throw e; | ||
} | ||
} finally { | ||
if (fs.exists(intermediateHdfsPath)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exists and delete can throw IOException, you'll want to catch those and at least log them. This is a bit tricky
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ideally the exceptions would bubble up as suppressed exceptions, but that may not be possible to do in a clean way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed.
f86b5f6
to
31bf270
Compare
} | ||
} | ||
DistributedCache.addFileToClassPath(hdfsPath, conf, fs); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DistributedCache is deprecated, can you use job.addFileToClassPath
to add the files?
Yes, right now that seems to do exactly this under the hood, but if hadoop ever gets their crap together they can change this without warning.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
@nishantmonu51 @drcrallen looks like this change is a little less trivial than originally thought. Should we push it back to 0.8.3 / 0.9.0? |
public static void cleanup(Job job) throws IOException | ||
{ | ||
final Path jobDir = getJobPath(job.getJobID(), job.getWorkingDirectory()); | ||
final FileSystem fs = jobDir.getFileSystem(job.getConfiguration()); | ||
fs.delete(jobDir, true); | ||
fs.delete(getJobClassPathDir(job.getJobName(), job.getWorkingDirectory()), true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method could use some extra assurances / retries as well. (non-blocking)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to do this for cleanup for other Hadoop Jobs as well, would like to do it in a separate PR. Opening a github issue
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@xvrl I'm actually ok with it right now. I would love to have some more improvements on it and Unit tests, which can come in 0.9.x but at this point I think it is in a much better place than it was, and I'm 👍 if @nishantmonu51 is willing to add some more assurances and unit tests for 0.9 |
I tried to write a Unit test for above, but the JobHelper.setupClasspath skips the jar copying if the fileSystem is LocalFileSystem, so i tested it manually with local HDFS setup. |
btw, I am fine with moving this to 0.9/0.8.3 also. |
@nishantmonu51 io.druid.segment.loading.HdfsFileTimestampVersionFinderTest#setupStatic sets up a mini hdfs cluster for actual hdfs testing. |
@drcrallen thanks, I will use that and add a Test. |
/** | ||
* Uploads jar files to hdfs and configures the classpath. | ||
* Snapshot jar files are uploaded to intermediateClasspath and not shared across multiple jobs. | ||
* Non-Snapshot |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Non-Snapshot? Is this incomplete sentence?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed.
fa3753f
to
fca3250
Compare
@drcrallen added test. |
} | ||
return apply(input.getCause()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
addJarToClassPath has
throw new ISE("File does not exist even after moving from[%s] to [%s]", intermediateHdfsPath, hdfsPath);
that means it won't be retried. Is that intentional?
why not retry here irrespective of what the exception type is, in the worst case there would be 3 retries instead of having to do a recursion?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will modify it to IOException though this should ideally never happen in any case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
fca3250
to
3636aa9
Compare
@@ -79,6 +79,8 @@ | |||
private static final int NUM_RETRIES = 8; | |||
private static final int SECONDS_BETWEEN_RETRIES = 2; | |||
private static final int DEFAULT_FS_BUFFER_SIZE = 1 << 18; // 256KB | |||
private static final Pattern SNAPSHOT_JAR = Pattern.compile(".*SNAPSHOT(-selfcontained)?\\.jar$"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this should be
Pattern.compile(".*\\-SNAPSHOT(-selfcontained)?\\.jar$")
just in case someone was crazy enough to have artifact name be something like iAmNotSNAPSHOT.jar , it is not a "snapshot" jar really.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
3636aa9
to
62bef6b
Compare
LGTM |
if (exception != null) { | ||
exception.addSuppressed(e); | ||
} else { | ||
exception = e; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this just making sure we don't have a jar left over at intermediateHdfsPath
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see :) Can we also invert the condition in "if else"? I actually feel it takes a bit less mind gymnastics to understand if it was
if (exception == null) {
exception = e;
} else {
exception.addSuppressed(e);
}
{ | ||
Job job = Job.getInstance(conf, "test-job"); | ||
DistributedFileSystem fs = miniCluster.getFileSystem(); | ||
Path intermediatePath = new Path("/tmp/classpath"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Intermediate path should be unique per task, right? can this be randomized a bit so one test cannot pollute another?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here, this is on hdfs, diff runs will have diff hdfs directory.
@nishantmonu51 is there a way to add tests for failure cases? |
int id = barrier.await(); | ||
Job job = Job.getInstance(conf, "test-job-" + id); | ||
Path intermediatePathForJob = new Path(intermediatePath, "job-" + id); | ||
barrier.await(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure why we need to await() for the second time?
9ba4f83
to
41cc078
Compare
I added a test for concurrent upload which tries to simulate the failure case which this PR is intended to fix. |
); | ||
} | ||
|
||
for (Future<Boolean> future : futures) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggest Futures.allAsList(futures).get(....)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then you don't really need to check the return value because it should bubble up any exceptions in the execution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice suggestion, done.
d457699
to
31097f5
Compare
👍 |
{ | ||
hdfsTmpDir = File.createTempFile("hdfsClasspathSetupTest", "dir"); | ||
hdfsTmpDir.deleteOnExit(); | ||
if (!hdfsTmpDir.delete()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is hdfsTmpDir.deleteOnExit()
needed when hdfsTmpDir.delete()
is ensured?
Also, it is safer to use @rule TemporaryFolder instead as that will do cleanup as soon as test is done instead of jvm exit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried using TemporaryFolder, but we cant use it as a static field.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmmm, if this is to be done only once for all the tests here then we can probably make things non static and use a no argument constructor to do the setup? Is there a reason for hdfsTmpDir to be static?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can use a non arg constructor, but still i will need to make the fields static since i will have to shut it down in a static afterClass method. I think it will be wierd to initialize static fields of a class in constructor rather than a BeforeClass method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, i see, junit unfortunately does not have anything to setup/teardown "instance" .
few fixes delete intermediate file early better exception handling use static pattern instead of compiling it every time Add retry for transient exceptions remove usage of deprecated method. Add test fix imports fix javadoc review comment. review comment: handle crazy snapshot naming review comments remove default retry count in favour of already present constant review comment make random intermediate and final paths. review comment, use temporaryFolder where possible
31097f5
to
3641a0e
Compare
Fix Race in jar upload during hadoop indexing
fixes Race while uploading jar files.
Fixes - #582