New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
MAPREDUCE-7435. Manifest Committer OOM on abfs #5519
MAPREDUCE-7435. Manifest Committer OOM on abfs #5519
Conversation
c0fc290
to
720f120
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, @steveloughran . This mostly looks good. I entered a few comments.
|
||
@Override | ||
public void setMeanStatistic(final String key, final MeanStatistic value) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
meanStatistics().put(key, value);
?
public static void addHeapInformation(IOStatisticsSetters ioStatisticsSetters, | ||
String stage) { | ||
// force a gc. bit of bad form but it makes for better numbers | ||
System.gc(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This triggered a Spotbugs warning. Do think the forced GC should go behind a config flag, default off, and turned on in the tests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, I will do that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
GC pulled out of production code & only invoked in test code
// needed to avoid this test contaminating others in the same JVM | ||
FileSystem.closeAll(); | ||
conf.set(fileImpl, fileImplClassname); | ||
conf.set(fileImpl, fileImplClassname); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Duplicated line?
I wasn't sure why we need to set the conf here in the finally block. Did something mutate it after line 761, and now we need to restore it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
duplicate. cut
the reason it is in is not so much because of any change in the pr, as it surfaced a condition which is already there -this test changes the default "file" fs, and in some test runs that wasn't being reset, so other tests were failing later for no obvious reason
@@ -63,6 +81,10 @@ public void setup() throws Exception { | |||
.isGreaterThan(0); | |||
} | |||
|
|||
public long heapSize() { | |||
return Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nitpick: some indentation issues here.
success.save(summaryFS, path, true); | ||
LOG.info("Saved summary to {}", path); | ||
ManifestPrinter showManifest = new ManifestPrinter(); | ||
ManifestSuccessData manifestSuccessData = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nitpick: some indentation issues here.
@cnauroth -thanks for the comments; will update I've converted this to a draft as I am working on the next step of this: streaming the list of files to rename from each manifest into a SequenceFile saved to the local FS; rename stage reading that in and spreading the renames across the worker pool, maybe in batches. this will eliminate the need to store the list of files to rename in memory at all and so not worry about #of files or path lengths. the file will be on localfs, so on an SSD machine fairly quick to write and read back, especially if the os buffers well/is optimised for transient files. |
💔 -1 overall
This message was automatically generated. |
2d0dcce
to
de8c6e5
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good. I'll wait to hear from you when you want code review again. Thanks, Steve.
de8c6e5
to
1358391
Compare
updated pr has been run through azure, with stats of a terasort being
once abfs adds iostats context update of input stream reads, we could collect and add that into the stats too; not worrying about it until then. |
tested azure cardiff, which is a slow test run today (no network). It would be nice to move the LoadManifests test into the parallel bit of the test run, but trying to do it seems to blow up too much stuff (the abfs parallel test phase runs individual test cases in parallel, see)
|
parallel test running failed everywhere, but I have improved ITestAbfsLoadManifestsStage performance
brings test time down to 10s locally. IOStats does imply many MB of data is iostat.
|
testrun failure is the usual intermittent failure of the slow tests, showing some transient failure happened and was recovered from. wifi was playing up all evening
|
spotbugs complaint is valid, but nothing to do with this PR
|
039648b
to
b5166b6
Compare
throw new UncheckedIOException(e); | ||
} catch (InterruptedException e) { | ||
// being stopped implicitly | ||
LOG.debug("interrupted", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
set stop
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done in finally
...rc/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/impl/EntryFileIO.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks really interesting and well-written.
Have started to look at some bits of core functionality, moving towards tests afterwards.
...in/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/CommitJobStage.java
Outdated
Show resolved
Hide resolved
addHeapInformation(heapInfo, "setup"); | ||
// load the manifests | ||
final StageConfig stageConfig = getStageConfig(); | ||
LoadManifestsStage.Result result = new LoadManifestsStage(stageConfig).apply( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion: We can include a duration tracker to know the time taken to load manifests in the final stats.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
already done in AbstractJobOrTaskStage
} | ||
if (active.get()) { | ||
try { | ||
queue.put(new QueueEntry(Actions.write, entries)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggestion: We could use queue.offer(E e, long timeout, TimeUnit unit)
, such that we are waiting for the queue to have the capacity to add the Entry while also having a timeout in case something goes wrong. We can catch the interrupt and throw/swallow accordingly if it exceeds the timeout?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmmm. I would rather have the writer work. If have a timeout here it should be really big as we want to cope with all threads blocking for a while
...rc/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/impl/EntryFileIO.java
Show resolved
Hide resolved
if (active.get()) { | ||
try { | ||
queue.put(new QueueEntry(Actions.write, entries)); | ||
LOG.debug("Queued {}", entries.size()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some info about the entry that was queued in the LOG?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's a list...what do we want to add?
* @param path path to create | ||
* @return true if dir created/found | ||
* @param dirEntry dir to create | ||
* @return Outcome |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Better javadocs for return, "State of the directory in the dir map" or something?
* This is primarily for tests or when submitting work into a TaskPool. | ||
* equivalent to | ||
* <pre> | ||
* for(long l = start, l < finis; l++) yield l; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo: "l < excludedFinish"
Thread.currentThread().setName("EntryIOWriter"); | ||
try { | ||
while (!stop.get()) { | ||
final QueueEntry queueEntry = queue.take(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems like we could wait indefinitely on this.
How about poll(long timeout, TimeUnit unit)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i don't know how long we should wait here? It assumes that yes, the caller will eventually stop the write
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed to 10 minutes, returning false. caller gets to react (which it will do by raising an IOE but giving anything raised by the writer thread priority)
// signal queue closure by queuing a stop option. | ||
// this is added at the end of the list of queued blocks, | ||
// of which are written. | ||
try { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LOG something like "Tasks left in queue = capacity
- queue.remainingCapacity()
" for better logging. We could do something like this while offering as well but seems apt for close().
|
||
It can help limit the amount of memory consumed during manifest load during | ||
job commit. | ||
The maximumum number of loaded manifests will be |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo: "maximum"
* Add heap information to gauges in _SUCCESS * which includes pulling up part of impl.IOStatisticsStore into a public IOStatisticsSetters interface with the put{Counter, Gauge, etc} methods only. * TestLoadManifests scaled up with #of tasks and #of files in each task to generate more load. * code to build up dir map during load phase; not wired up Summary: abfs uses a lot more heap during the load phase than file; possibly due to buffering, but doing a pipeline for processing the results isn't sustainable. Either two phase: * phase 1, build up the dir list, discard manifests after each load * phase 2, load manifests and rename incrementally Or: unified with some complicated directory creation process to ensure that each task's dirs exist before its rename begins. Change-Id: I8595c083435e3d4df27343599687677abfc1c013
* DirEntry and FileEntry are writeable * LoadManifests to take a path to where to cache the rename list. not yet wired up. Change-Id: Ibd992b179bd0bcf26a39ae4ce5407257ecbfcb10
This is a big change, with tests as far as verifying core read/write happy. Current state: simple read/write good, async queue not yet tested Change-Id: I7cb1443024780b355a8f3bb96fbfe08d8608d968
interim commit Change-Id: I80bb4e72c1029baad8fb87d8c9287b08c0b000f4
...but not tested the job commit yet Change-Id: I0f54ede94e41592558468df1c87f4a39d2461223
...but not tested the job commit yet Change-Id: I4d50636542673a3f25a7ab363df1b1bd221216ae
* TestEntryFileIO extended for ths * ABFS terasort test happy! Change-Id: I068861973114d9947f3d22eaf32a6ee3b7ca8fa2 TODO: fault injection on the writes
* validation also uses manifest entries (and so works!) * testing expects this * tests of IOStats * tests of new RemoteIterators Change-Id: I4cfb308d4b08f1f775cfdbe2df6f8ff07ac6bc54
Change-Id: I2008d31bff3af59396a04dddc1b9357b1a812294
* moved RangeExcludingLongIterator into RemoteIterators, added test. * address checkstyle * address spotbugs * address deprecation * ValidateRenameFilesStage doesn't validate etags on wasb; helps address a JIRA about hadoop-azure testing. Change-Id: Id6507d79f8d3cfa434afb65bfe9fc7539a7c1cf5
* back to the original 200 manifest files * increase worker pool and buffer queue size (more significant before reducing the manifest count) brings test time down to 10s locally. IOStats does imply many MB of data is being PUT/GET so it is good to keep small so people running with less bandwidth don't suffer. Maybe, maybe, the size could switch with a -Dscale? Change-Id: I49d201d7af7434797ab6fff5831a0f899c5c4185
…leanup Change-Id: If043263676c4d5694065e7ec35954a7f66c04d90
ok, rebasing and pushing up with a commit to address most of the changes. Not addressed: having timeouts on the offer/take of the EntryWriter in the queue. I agree it is safest if we do add a timeout here, just as an emergency. |
Change-Id: Ib93ba8ba632135a05da126a75f34e78bd381cf2a
b5166b6
to
8e83fdc
Compare
latest patch tested against azure cardiff, timeout unrelated
I wonder if we should just increase the timeout there? |
@mehakmeet when you get time, can you review this. I would like to get this in before I forget about it |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is very close to getting merged. Had one doubt with respect to the testing, was the large loading of manifests performed on the old way of handling manifest files to see OOM errors? Tests look good overall just wanted to know if that limit was being hit and now we see it rectified with the new approach.
// do an explicit close to help isolate any failure. | ||
SequenceFile.Writer writer = createWriter(); | ||
writer.append(NullWritable.get(), source); | ||
writer.flush(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just a doubt here. Do we need to explicitly flush the writer before closing? Won't that be done in close too? If yes, we can test both mechanisms by just saying writer.close()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just being rigorous. on hdfs close() doesn't actually sync the data, just flushes it, FWIW.
// now use the iterator to access it. | ||
List<FileEntry> files = new ArrayList<>(); | ||
Assertions.assertThat(foreach(iterateOverEntryFile(), files::add)) | ||
.isEqualTo(0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you clarify which value equates to "0" in this test by some comments?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added a description
.isEqualTo(2); | ||
|
||
// unknown value | ||
ioStatistics.setCounter("c2", 3); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about e assert that this unknown counter shouldn't exist in the counters map? Just to test the no-op I guess.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets see...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
had to parameterize the test so we assert than on snapshots they do accrue, but on the other impls they don't
@@ -141,6 +145,9 @@ | |||
*/ | |||
private StageConfig ta11Config; | |||
|
|||
private LoadedManifestData |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
javadocs for consistency.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done. also needed to be static to work properly, so doc that after fixing it
@@ -82,6 +85,8 @@ public class TestRenameStageFailure extends AbstractManifestCommitterTest { | |||
/** resilient commit expected? */ | |||
private boolean resilientCommit; | |||
|
|||
private EntryFileIO entryFileIO; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
javadocs for consistency.
that is: success file contains entries which aren't present in the FS Fixes * find bit in earlier test where file was being deleted, and restore it (and re-order it too!) * LoadManifestsStage doesn't optionally return manifests for testing; tests modified to match. * EntryFileIO will report timeout after 10 minutes if queue blocks somehow. * LoadManifestsStage handles this timeout and will raise it as a failure, but only secondary to any exception raised by the writer thread * SUCCESS file can be configured with #of files to list, allows for tests to assert on many thousands of files, although in production it is still fixed to a small number for performance reasons. Change-Id: I642c1178928de427bf6e09f0fe0d345876311fb5
Change-Id: Ica813c6068eca18d83bf2f5f94fac4a1e1996c36
🎊 +1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
The other new ones are related to test methods whose numbering breaks the style checker's requirements * test_0440_validateSuccessFiles * test_0450_validationDetectsFailures Change-Id: I36267e4d9912873e457126341385f866acd6d148
🎊 +1 overall
This message was automatically generated. |
@mehakmeet @cnauroth need a final review here. we also need a google gcs test suite somewhere, don't we? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, one doubt about new constants, else we're good to go in with this. Really nice implementation btw.
public static final int WRITER_SHUTDOWN_TIMEOUT_SECONDS = 60; | ||
|
||
/** | ||
* How long should trying to queue a write block before giving up | ||
* with an error? | ||
* This is a safety feature to ensure that if something has gone wrong | ||
* in the queue code the job fails with an error rather than just hangs | ||
*/ | ||
public static final int WRITER_QUEUE_PUT_TIMEOUT_MINUTES = 10; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I think I missed these constants being added, don't you think these should be configurable, just for some kind of fallback sakes, so that these values never cause any issues and are easily changeable? I guess if it waits for this long then, we can assume it's just hanging as well. Your call on it being configurable or not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
my view if things are this bad it is a disaster and the job is failing as either the thread concurrency is broken or the local fs has failed.
merged to trunk, now backporting to 3.3 |
This modifies the manifest committer so that the list of files to rename is passed between stages as a file of writeable entries on the local filesystem. The map of directories to create is still passed in memory; this map is built across all tasks, so even if many tasks created files, if they all write into the same set of directories the memory needed is O(directories) with the task count not a factor. The _SUCCESS file reports on heap size through gauges. This should give a warning if there are problems. Contributed by Steve Loughran Change-Id: Ic7707d2dde9daa28cd3a927e49972c15313336ad
This modifies the manifest committer so that the list of files to rename is passed between stages as a file of writeable entries on the local filesystem. The map of directories to create is still passed in memory; this map is built across all tasks, so even if many tasks created files, if they all write into the same set of directories the memory needed is O(directories) with the task count not a factor. The _SUCCESS file reports on heap size through gauges. This should give a warning if there are problems. Contributed by Steve Loughran
Summary: abfs uses a lot more heap during the load phase than file; possibly due to buffering.
How was this patch tested?
For code changes:
LICENSE
,LICENSE-binary
,NOTICE-binary
files?