Skip to content

Commit

Permalink
MAPREDUCE-7435. validation reporting missing files
Browse files Browse the repository at this point in the history
that is: success file contains entries which aren't present in the FS

Fixes
* find bit in earlier test where file was being deleted, and restore it
  (and re-order it too!)
* LoadManifestsStage doesn't optionally return manifests for testing;
  tests modified to match.
* EntryFileIO will report timeout after 10 minutes if queue blocks somehow.
* LoadManifestsStage handles this timeout and will raise it as a failure,
  but only secondary to any exception raised by the writer thread
* SUCCESS file can be configured with #of files to list, allows for tests
  to assert on many thousands of files, although in production it is still
  fixed to a small number for performance reasons.

Change-Id: I642c1178928de427bf6e09f0fe0d345876311fb5
  • Loading branch information
steveloughran committed Jun 1, 2023
1 parent 8e83fdc commit b289707
Show file tree
Hide file tree
Showing 10 changed files with 183 additions and 119 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,18 @@ public class EntryFileIO {
private static final Logger LOG = LoggerFactory.getLogger(
EntryFileIO.class);

public static final int WRITER_SHUTDOWN_TIMEOUT = 60;
/**
* How long should the writer shutdown take?
*/
public static final int WRITER_SHUTDOWN_TIMEOUT_SECONDS = 60;

/**
* How long should trying to queue a write block before giving up
* with an error?
* This is a safety feature to ensure that if something has gone wrong
* in the queue code the job fails with an error rather than just hangs
*/
public static final int WRITER_QUEUE_PUT_TIMEOUT_MINUTES = 10;

/** Configuration used to load filesystems. */
private final Configuration conf;
Expand Down Expand Up @@ -327,15 +338,20 @@ public boolean enqueue(List<FileEntry> entries) {
}
if (active.get()) {
try {
queue.put(new QueueEntry(Actions.write, entries));
LOG.debug("Queued {}", entries.size());
return true;
LOG.debug("Queueing {} entries", entries.size());
final boolean enqueued = queue.offer(new QueueEntry(Actions.write, entries),
WRITER_QUEUE_PUT_TIMEOUT_MINUTES, TimeUnit.MINUTES);
if (!enqueued) {
LOG.warn("Timeout submitting entries to {}", this);
}
return enqueued;
} catch (InterruptedException e) {
Thread.interrupted();
return false;
}
} else {
LOG.warn("EntryFile write queue inactive; discarding {} entries", entries.size());
LOG.warn("EntryFile write queue inactive; discarding {} entries submitted to {}",
entries.size(), this);
return false;
}
}
Expand Down Expand Up @@ -424,7 +440,7 @@ public void close() throws IOException {
}
try {
// wait for the op to finish.
int total = FutureIO.awaitFuture(future, WRITER_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS);
int total = FutureIO.awaitFuture(future, WRITER_SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS);
LOG.debug("Processed {} files", total);
executor.shutdown();
} catch (TimeoutException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.lang3.tuple.Triple;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
Expand All @@ -35,6 +35,7 @@

import static java.util.Objects.requireNonNull;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.SUCCESS_MARKER_FILE_LIMIT;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.COMMITTER_BYTES_COMMITTED_COUNT;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.COMMITTER_FILES_COMMITTED_COUNT;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_STAGE_JOB_COMMIT;
Expand Down Expand Up @@ -84,23 +85,22 @@ protected CommitJobStage.Result executeStage(
LoadManifestsStage.Result result = new LoadManifestsStage(stageConfig).apply(
new LoadManifestsStage.Arguments(
File.createTempFile("manifest", ".list"),
false, /* do not cache manifests */
/* do not cache manifests */
stageConfig.getWriterQueueCapacity()));
LoadManifestsStage.SummaryInfo summary = result.getSummary();
LoadManifestsStage.SummaryInfo loadedManifestSummary = result.getSummary();
loadedManifestData = result.getLoadedManifestData();

LOG.debug("{}: Job Summary {}", getName(), summary);
LOG.debug("{}: Job Summary {}", getName(), loadedManifestSummary);
LOG.info("{}: Committing job with file count: {}; total size {} bytes",
getName(),
summary.getFileCount(),
String.format("%,d", summary.getTotalFileSize()));
loadedManifestSummary.getFileCount(),
String.format("%,d", loadedManifestSummary.getTotalFileSize()));
addHeapInformation(heapInfo, OP_STAGE_JOB_LOAD_MANIFESTS);


// add in the manifest statistics to our local IOStatistics for
// reporting.
IOStatisticsStore iostats = getIOStatistics();
iostats.aggregate(summary.getIOStatistics());
iostats.aggregate(loadedManifestSummary.getIOStatistics());

// prepare destination directories.
final CreateOutputDirectoriesStage.Result dirStageResults =
Expand All @@ -113,7 +113,9 @@ protected CommitJobStage.Result executeStage(
// and hence all aggregate stats from the tasks.
ManifestSuccessData successData;
successData = new RenameFilesStage(stageConfig).apply(
Pair.of(loadedManifestData, dirStageResults.getCreatedDirectories()));
Triple.of(loadedManifestData,
dirStageResults.getCreatedDirectories(),
stageConfig.getSuccessMarkerFileLimit()));
if (LOG.isDebugEnabled()) {
LOG.debug("{}: _SUCCESS file summary {}", getName(), successData.toJson());
}
Expand All @@ -124,10 +126,10 @@ protected CommitJobStage.Result executeStage(
// aggregating tasks.
iostats.setCounter(
COMMITTER_FILES_COMMITTED_COUNT,
summary.getFileCount());
loadedManifestSummary.getFileCount());
iostats.setCounter(
COMMITTER_BYTES_COMMITTED_COUNT,
summary.getTotalFileSize());
loadedManifestSummary.getTotalFileSize());
successData.snapshotIOStatistics(iostats);
successData.getIOStatistics().aggregate(heapInfo);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,6 @@ public class LoadManifestsStage extends
*/
private final SummaryInfo summaryInfo = new SummaryInfo();

/**
* List of loaded manifests.
*/
private final List<TaskManifest> manifests = new ArrayList<>();

/**
* Map of directories from manifests, coalesced to reduce duplication.
*/
Expand All @@ -91,12 +86,6 @@ public class LoadManifestsStage extends
*/
private EntryFileIO.EntryWriter entryWriter;

/**
* Should the manifests be cached and returned?
* only for testing.
*/
private boolean cacheManifests;

public LoadManifestsStage(final StageConfig stageConfig) {
super(false, stageConfig, OP_STAGE_JOB_LOAD_MANIFESTS, true);
}
Expand All @@ -117,20 +106,14 @@ protected LoadManifestsStage.Result executeStage(
LOG.info("{}: Executing Manifest Job Commit with manifests in {}",
getName(),
manifestDir);
cacheManifests = arguments.cacheManifests;

final Path entrySequenceData = arguments.getEntrySequenceData();

// the entry writer for queuing data.
entryWriter = entryFileIO.launchEntryWriter(
entryFileIO.createWriter(entrySequenceData),
arguments.queueCapacity);
// manifest list is only built up when caching is enabled.
// as this is memory hungry, it is warned about
List<TaskManifest> manifestList;
if (arguments.cacheManifests) {
LOG.info("Loaded manifests are cached; this is memory hungry");
}

try {

// sync fs before the list
Expand All @@ -142,7 +125,7 @@ protected LoadManifestsStage.Result executeStage(
haltableRemoteIterator(listManifests(),
() -> entryWriter.isActive());

manifestList = loadAllManifests(manifestFiles);
processAllManifests(manifestFiles);
maybeAddIOStatistics(getIOStatistics(), manifestFiles);

LOG.info("{}: Summary of {} manifests loaded in {}: {}",
Expand All @@ -158,38 +141,47 @@ protected LoadManifestsStage.Result executeStage(
entryWriter.maybeRaiseWriteException();

// collect any stats
} catch (EntryWriteException e) {
// something went wrong while writing.
// raise anything on the write thread,
entryWriter.maybeRaiseWriteException();

// falling back to that from the worker thread
throw e;
} finally {
// close which is a no-op if the clean close was invoked;
// it is not a no-op if something went wrong with reading/parsing/processing
// the manifests.
entryWriter.close();
}

final LoadedManifestData loadedManifestData = new LoadedManifestData(
new ArrayList<>(directories.values()), // new array to free up the map
entrySequenceData,
entryWriter.getCount());

return new LoadManifestsStage.Result(summaryInfo, loadedManifestData, manifestList);
return new LoadManifestsStage.Result(summaryInfo, loadedManifestData);
}

/**
* Load all the manifests.
* Load and process all the manifests.
* @param manifestFiles list of manifest files.
* @return the loaded manifests.
* @throws IOException IO Failure.
* @throws IOException failure to load/parse/queue
*/
private List<TaskManifest> loadAllManifests(
private void processAllManifests(
final RemoteIterator<FileStatus> manifestFiles) throws IOException {

trackDurationOfInvocation(getIOStatistics(), OP_LOAD_ALL_MANIFESTS, () ->
TaskPool.foreach(manifestFiles)
.executeWith(getIOProcessors())
.stopOnFailure()
.run(this::processOneManifest));
return manifests;
}

/**
* Method invoked to process one manifest.
* @param status file to process.
* @throws IOException failure to load/parse
* @throws IOException failure to load/parse/queue
*/
private void processOneManifest(FileStatus status)
throws IOException {
Expand All @@ -200,9 +192,9 @@ private void processOneManifest(FileStatus status)

// update the directories
final int created = coalesceDirectories(manifest);
final String taskID = manifest.getTaskID();
LOG.debug("{}: task {} added {} directories",
getName(), taskID, created);
final String attemptID = manifest.getTaskAttemptID();
LOG.debug("{}: task attempt {} added {} directories",
getName(), attemptID, created);

// add to the summary.
summaryInfo.add(manifest);
Expand All @@ -213,20 +205,12 @@ private void processOneManifest(FileStatus status)
manifest.setIOStatistics(null);
manifest.getExtraData().clear();

// if manifests are cached add to the list
if (cacheManifests) {
// update the manifest list in a synchronized block.
synchronized (manifests) {
manifests.add(manifest);
}
}

// queue those files.
final boolean enqueued = entryWriter.enqueue(manifest.getFilesToCommit());
if (!enqueued) {
LOG.warn("{}: Failed to write manifest for task {}",
getName(),
taskID);
getName(), attemptID);
throw new EntryWriteException(attemptID);
}

}
Expand Down Expand Up @@ -301,21 +285,20 @@ public static final class Arguments {
*/
private final File entrySequenceFile;

/**
* build a list of manifests and return them?
*/
private final boolean cacheManifests;

/**
* Capacity for queue between manifest loader and the writers.
*/
private final int queueCapacity;

public Arguments(final File entrySequenceFile,
final boolean cacheManifests,
/**
* Arguments.
* @param entrySequenceFile path to local file to create for storing entries
* @param queueCapacity capacity of the queue
*/
public Arguments(
final File entrySequenceFile,
final int queueCapacity) {
this.entrySequenceFile = entrySequenceFile;
this.cacheManifests = cacheManifests;
this.queueCapacity = queueCapacity;
}

Expand All @@ -331,37 +314,42 @@ private Path getEntrySequenceData() {
public static final class Result {
private final SummaryInfo summary;

/**
* manifest list, non-null only if cacheManifests is true.
*/
private final List<TaskManifest> manifests;

/**
* Output of this stage to pass on to the subsequence stages.
*/
private final LoadedManifestData loadedManifestData;

public Result(SummaryInfo summary,
final LoadedManifestData loadedManifestData,
final List<TaskManifest> manifests) {
/**
* Result.
* @param summary summary of jobs
* @param loadedManifestData all loaded manifest data
*/
public Result(
final SummaryInfo summary,
final LoadedManifestData loadedManifestData) {
this.summary = summary;
this.manifests = manifests;
this.loadedManifestData = loadedManifestData;
}

public SummaryInfo getSummary() {
return summary;
}

public List<TaskManifest> getManifests() {
return manifests;
}

public LoadedManifestData getLoadedManifestData() {
return loadedManifestData;
}
}

/**
* IOE to raise on queueing failure.
*/
public static final class EntryWriteException extends IOException {

private EntryWriteException(String taskId) {
super("Failed to write manifest data for task "
+ taskId + "to local file");
}
}
/**
* Summary information.
* Implementation note: atomic counters are used here to keep spotbugs quiet,
Expand All @@ -379,6 +367,11 @@ public static final class SummaryInfo implements IOStatisticsSource {
*/
private final List<String> taskIDs = new ArrayList<>();

/**
* Task IDs.
*/
private final List<String> taskAttemptIDs = new ArrayList<>();

/**
* How many manifests were loaded.
*/
Expand Down Expand Up @@ -431,6 +424,10 @@ public List<String> getTaskIDs() {
return taskIDs;
}

public List<String> getTaskAttemptIDs() {
return taskAttemptIDs;
}

/**
* Add all statistics; synchronized.
* @param manifest manifest to add.
Expand All @@ -442,6 +439,7 @@ public synchronized void add(TaskManifest manifest) {
directoryCount.addAndGet(manifest.getDestDirectories().size());
totalFileSize.addAndGet(manifest.getTotalFileSize());
taskIDs.add(manifest.getTaskID());
taskAttemptIDs.add(manifest.getTaskAttemptID());
}

/**
Expand Down

0 comments on commit b289707

Please sign in to comment.