Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MAPREDUCE-7435. Manifest Committer OOM on abfs #5519

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
17db68e
MAPREDUCE-7435. Manifest Committer OOM on abfs
steveloughran Mar 29, 2023
e153791
MAPREDUCE-7435. committer OOM
steveloughran Mar 31, 2023
16422e4
MAPREDUCE-7435. oom: switch to sequence file for storage of the files.
steveloughran Apr 4, 2023
f95a926
MAPREDUCE-7435. oom: switch to sequence file for storage of the files.
steveloughran Apr 5, 2023
3046b08
MAPREDUCE-7435. starting to get write/read chain working
steveloughran Apr 5, 2023
3182c97
MAPREDUCE-7435. starting to get write/read chain working
steveloughran Apr 5, 2023
f18e9da
MAPREDUCE-7435. Async queue/write working
steveloughran Apr 6, 2023
30d90e0
MAPREDUCE-7435. following chain through to validation
steveloughran Apr 17, 2023
ed04b54
MAPREDUCE-7435. Parallel writing test
steveloughran Apr 18, 2023
61a6846
MAPREDUCE-7435. checkstyle, remote iterator work, azure tuning
steveloughran Apr 19, 2023
ffb25e7
MAPREDUCE-7435. improve ITestAbfsLoadManifestsStage performance
steveloughran Apr 19, 2023
9874acb
MAPREDUCE-7435. tweak test performance by disabling parallel TA dir c…
steveloughran Apr 19, 2023
7af5ab2
MAPREDUCE-7435. reduce delete overhead on renaming by a HEAD
steveloughran Apr 19, 2023
f969993
MAPREDUCE-7435. javadocs and other warnings, *not spotbugs*
steveloughran Apr 20, 2023
8e83fdc
MAPREDUCE-7435. Mehakmeet comments, excluding timeouts.
steveloughran Apr 26, 2023
b289707
MAPREDUCE-7435. validation reporting missing files
steveloughran Jun 1, 2023
355fa35
MAPREDUCE-7435. Mehakmeet review
steveloughran Jun 1, 2023
070c788
MAPREDUCE-7435. checkstyle: remove unused imports.
steveloughran Jun 6, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -63,7 +63,18 @@ public class EntryFileIO {
private static final Logger LOG = LoggerFactory.getLogger(
EntryFileIO.class);

public static final int WRITER_SHUTDOWN_TIMEOUT = 60;
/**
* How long should the writer shutdown take?
*/
public static final int WRITER_SHUTDOWN_TIMEOUT_SECONDS = 60;

/**
* How long should trying to queue a write block before giving up
* with an error?
* This is a safety feature to ensure that if something has gone wrong
* in the queue code the job fails with an error rather than just hangs
*/
public static final int WRITER_QUEUE_PUT_TIMEOUT_MINUTES = 10;
Comment on lines +69 to +77
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I think I missed these constants being added, don't you think these should be configurable, just for some kind of fallback sakes, so that these values never cause any issues and are easily changeable? I guess if it waits for this long then, we can assume it's just hanging as well. Your call on it being configurable or not.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my view if things are this bad it is a disaster and the job is failing as either the thread concurrency is broken or the local fs has failed.


/** Configuration used to load filesystems. */
private final Configuration conf;
Expand Down Expand Up @@ -327,15 +338,20 @@ public boolean enqueue(List<FileEntry> entries) {
}
if (active.get()) {
try {
queue.put(new QueueEntry(Actions.write, entries));
LOG.debug("Queued {}", entries.size());
return true;
LOG.debug("Queueing {} entries", entries.size());
final boolean enqueued = queue.offer(new QueueEntry(Actions.write, entries),
WRITER_QUEUE_PUT_TIMEOUT_MINUTES, TimeUnit.MINUTES);
if (!enqueued) {
LOG.warn("Timeout submitting entries to {}", this);
}
return enqueued;
} catch (InterruptedException e) {
Thread.interrupted();
steveloughran marked this conversation as resolved.
Show resolved Hide resolved
return false;
}
} else {
LOG.warn("EntryFile write queue inactive; discarding {} entries", entries.size());
LOG.warn("EntryFile write queue inactive; discarding {} entries submitted to {}",
entries.size(), this);
return false;
}
}
Expand Down Expand Up @@ -424,7 +440,7 @@ public void close() throws IOException {
}
try {
// wait for the op to finish.
int total = FutureIO.awaitFuture(future, WRITER_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS);
int total = FutureIO.awaitFuture(future, WRITER_SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS);
LOG.debug("Processed {} files", total);
executor.shutdown();
} catch (TimeoutException e) {
Expand Down
Expand Up @@ -26,7 +26,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.lang3.tuple.Triple;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
Expand All @@ -35,6 +35,7 @@

import static java.util.Objects.requireNonNull;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.SUCCESS_MARKER_FILE_LIMIT;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.COMMITTER_BYTES_COMMITTED_COUNT;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.COMMITTER_FILES_COMMITTED_COUNT;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_STAGE_JOB_COMMIT;
Expand Down Expand Up @@ -84,23 +85,22 @@ protected CommitJobStage.Result executeStage(
LoadManifestsStage.Result result = new LoadManifestsStage(stageConfig).apply(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: We can include a duration tracker to know the time taken to load manifests in the final stats.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

already done in AbstractJobOrTaskStage

new LoadManifestsStage.Arguments(
File.createTempFile("manifest", ".list"),
false, /* do not cache manifests */
/* do not cache manifests */
stageConfig.getWriterQueueCapacity()));
LoadManifestsStage.SummaryInfo summary = result.getSummary();
LoadManifestsStage.SummaryInfo loadedManifestSummary = result.getSummary();
loadedManifestData = result.getLoadedManifestData();

LOG.debug("{}: Job Summary {}", getName(), summary);
LOG.debug("{}: Job Summary {}", getName(), loadedManifestSummary);
LOG.info("{}: Committing job with file count: {}; total size {} bytes",
getName(),
summary.getFileCount(),
String.format("%,d", summary.getTotalFileSize()));
loadedManifestSummary.getFileCount(),
String.format("%,d", loadedManifestSummary.getTotalFileSize()));
addHeapInformation(heapInfo, OP_STAGE_JOB_LOAD_MANIFESTS);


// add in the manifest statistics to our local IOStatistics for
// reporting.
IOStatisticsStore iostats = getIOStatistics();
iostats.aggregate(summary.getIOStatistics());
iostats.aggregate(loadedManifestSummary.getIOStatistics());

// prepare destination directories.
final CreateOutputDirectoriesStage.Result dirStageResults =
Expand All @@ -113,7 +113,9 @@ protected CommitJobStage.Result executeStage(
// and hence all aggregate stats from the tasks.
ManifestSuccessData successData;
successData = new RenameFilesStage(stageConfig).apply(
Pair.of(loadedManifestData, dirStageResults.getCreatedDirectories()));
Triple.of(loadedManifestData,
dirStageResults.getCreatedDirectories(),
stageConfig.getSuccessMarkerFileLimit()));
if (LOG.isDebugEnabled()) {
LOG.debug("{}: _SUCCESS file summary {}", getName(), successData.toJson());
}
Expand All @@ -124,10 +126,10 @@ protected CommitJobStage.Result executeStage(
// aggregating tasks.
iostats.setCounter(
COMMITTER_FILES_COMMITTED_COUNT,
summary.getFileCount());
loadedManifestSummary.getFileCount());
iostats.setCounter(
COMMITTER_BYTES_COMMITTED_COUNT,
summary.getTotalFileSize());
loadedManifestSummary.getTotalFileSize());
successData.snapshotIOStatistics(iostats);
successData.getIOStatistics().aggregate(heapInfo);

Expand Down
Expand Up @@ -76,11 +76,6 @@ public class LoadManifestsStage extends
*/
private final SummaryInfo summaryInfo = new SummaryInfo();

/**
* List of loaded manifests.
*/
private final List<TaskManifest> manifests = new ArrayList<>();

/**
* Map of directories from manifests, coalesced to reduce duplication.
*/
Expand All @@ -91,12 +86,6 @@ public class LoadManifestsStage extends
*/
private EntryFileIO.EntryWriter entryWriter;

/**
* Should the manifests be cached and returned?
* only for testing.
*/
private boolean cacheManifests;

public LoadManifestsStage(final StageConfig stageConfig) {
super(false, stageConfig, OP_STAGE_JOB_LOAD_MANIFESTS, true);
}
Expand All @@ -117,20 +106,14 @@ protected LoadManifestsStage.Result executeStage(
LOG.info("{}: Executing Manifest Job Commit with manifests in {}",
getName(),
manifestDir);
cacheManifests = arguments.cacheManifests;

final Path entrySequenceData = arguments.getEntrySequenceData();

// the entry writer for queuing data.
entryWriter = entryFileIO.launchEntryWriter(
entryFileIO.createWriter(entrySequenceData),
arguments.queueCapacity);
// manifest list is only built up when caching is enabled.
// as this is memory hungry, it is warned about
List<TaskManifest> manifestList;
if (arguments.cacheManifests) {
LOG.info("Loaded manifests are cached; this is memory hungry");
}

try {

// sync fs before the list
Expand All @@ -142,7 +125,7 @@ protected LoadManifestsStage.Result executeStage(
haltableRemoteIterator(listManifests(),
() -> entryWriter.isActive());

manifestList = loadAllManifests(manifestFiles);
processAllManifests(manifestFiles);
maybeAddIOStatistics(getIOStatistics(), manifestFiles);

LOG.info("{}: Summary of {} manifests loaded in {}: {}",
Expand All @@ -158,38 +141,47 @@ protected LoadManifestsStage.Result executeStage(
entryWriter.maybeRaiseWriteException();

// collect any stats
} catch (EntryWriteException e) {
// something went wrong while writing.
// raise anything on the write thread,
entryWriter.maybeRaiseWriteException();

// falling back to that from the worker thread
throw e;
} finally {
// close which is a no-op if the clean close was invoked;
// it is not a no-op if something went wrong with reading/parsing/processing
// the manifests.
entryWriter.close();
}

final LoadedManifestData loadedManifestData = new LoadedManifestData(
new ArrayList<>(directories.values()), // new array to free up the map
entrySequenceData,
entryWriter.getCount());

return new LoadManifestsStage.Result(summaryInfo, loadedManifestData, manifestList);
return new LoadManifestsStage.Result(summaryInfo, loadedManifestData);
}

/**
* Load all the manifests.
* Load and process all the manifests.
* @param manifestFiles list of manifest files.
* @return the loaded manifests.
* @throws IOException IO Failure.
* @throws IOException failure to load/parse/queue
*/
private List<TaskManifest> loadAllManifests(
private void processAllManifests(
final RemoteIterator<FileStatus> manifestFiles) throws IOException {

trackDurationOfInvocation(getIOStatistics(), OP_LOAD_ALL_MANIFESTS, () ->
TaskPool.foreach(manifestFiles)
.executeWith(getIOProcessors())
.stopOnFailure()
.run(this::processOneManifest));
return manifests;
}

/**
* Method invoked to process one manifest.
* @param status file to process.
* @throws IOException failure to load/parse
* @throws IOException failure to load/parse/queue
*/
private void processOneManifest(FileStatus status)
throws IOException {
Expand All @@ -200,9 +192,9 @@ private void processOneManifest(FileStatus status)

// update the directories
final int created = coalesceDirectories(manifest);
final String taskID = manifest.getTaskID();
LOG.debug("{}: task {} added {} directories",
getName(), taskID, created);
final String attemptID = manifest.getTaskAttemptID();
LOG.debug("{}: task attempt {} added {} directories",
getName(), attemptID, created);

// add to the summary.
summaryInfo.add(manifest);
Expand All @@ -213,20 +205,12 @@ private void processOneManifest(FileStatus status)
manifest.setIOStatistics(null);
manifest.getExtraData().clear();

// if manifests are cached add to the list
if (cacheManifests) {
// update the manifest list in a synchronized block.
synchronized (manifests) {
manifests.add(manifest);
}
}

// queue those files.
final boolean enqueued = entryWriter.enqueue(manifest.getFilesToCommit());
if (!enqueued) {
LOG.warn("{}: Failed to write manifest for task {}",
getName(),
taskID);
getName(), attemptID);
throw new EntryWriteException(attemptID);
}

}
Expand Down Expand Up @@ -301,21 +285,20 @@ public static final class Arguments {
*/
private final File entrySequenceFile;

/**
* build a list of manifests and return them?
*/
private final boolean cacheManifests;

/**
* Capacity for queue between manifest loader and the writers.
*/
private final int queueCapacity;

public Arguments(final File entrySequenceFile,
final boolean cacheManifests,
/**
* Arguments.
* @param entrySequenceFile path to local file to create for storing entries
* @param queueCapacity capacity of the queue
*/
public Arguments(
final File entrySequenceFile,
final int queueCapacity) {
this.entrySequenceFile = entrySequenceFile;
this.cacheManifests = cacheManifests;
this.queueCapacity = queueCapacity;
}

Expand All @@ -331,37 +314,42 @@ private Path getEntrySequenceData() {
public static final class Result {
private final SummaryInfo summary;

/**
* manifest list, non-null only if cacheManifests is true.
*/
private final List<TaskManifest> manifests;

/**
* Output of this stage to pass on to the subsequence stages.
*/
private final LoadedManifestData loadedManifestData;

public Result(SummaryInfo summary,
final LoadedManifestData loadedManifestData,
final List<TaskManifest> manifests) {
/**
* Result.
* @param summary summary of jobs
* @param loadedManifestData all loaded manifest data
*/
public Result(
final SummaryInfo summary,
final LoadedManifestData loadedManifestData) {
this.summary = summary;
this.manifests = manifests;
this.loadedManifestData = loadedManifestData;
}

public SummaryInfo getSummary() {
return summary;
}

public List<TaskManifest> getManifests() {
return manifests;
}

public LoadedManifestData getLoadedManifestData() {
return loadedManifestData;
}
}

/**
* IOE to raise on queueing failure.
*/
public static final class EntryWriteException extends IOException {

private EntryWriteException(String taskId) {
super("Failed to write manifest data for task "
+ taskId + "to local file");
}
}
/**
* Summary information.
* Implementation note: atomic counters are used here to keep spotbugs quiet,
Expand All @@ -379,6 +367,11 @@ public static final class SummaryInfo implements IOStatisticsSource {
*/
private final List<String> taskIDs = new ArrayList<>();

/**
* Task IDs.
*/
private final List<String> taskAttemptIDs = new ArrayList<>();

/**
* How many manifests were loaded.
*/
Expand Down Expand Up @@ -431,6 +424,10 @@ public List<String> getTaskIDs() {
return taskIDs;
}

public List<String> getTaskAttemptIDs() {
return taskAttemptIDs;
}

/**
* Add all statistics; synchronized.
* @param manifest manifest to add.
Expand All @@ -442,6 +439,7 @@ public synchronized void add(TaskManifest manifest) {
directoryCount.addAndGet(manifest.getDestDirectories().size());
totalFileSize.addAndGet(manifest.getTotalFileSize());
taskIDs.add(manifest.getTaskID());
taskAttemptIDs.add(manifest.getTaskAttemptID());
}

/**
Expand Down