Skip to content

Commit

Permalink
MAPREDUCE-7435. Mehakmeet comments, excluding timeouts.
Browse files Browse the repository at this point in the history
Change-Id: Ib93ba8ba632135a05da126a75f34e78bd381cf2a
  • Loading branch information
steveloughran committed Apr 26, 2023
1 parent f969993 commit 8e83fdc
Show file tree
Hide file tree
Showing 6 changed files with 28 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ public static <S> RemoteIterator<S> haltableRemoteIterator(
* This is primarily for tests or when submitting work into a TaskPool.
* equivalent to
* <pre>
* for(long l = start, l &lt; finis; l++) yield l;
* for(long l = start, l &lt; excludedFinish; l++) yield l;
* </pre>
* @param start start value
* @param excludedFinish excluded finish
Expand Down Expand Up @@ -422,8 +422,8 @@ public void close() throws IOException {
/**
* Wrapper of another remote iterator; IOStatistics
* and Closeable methods are passed down if implemented.
* This class may be subclasses if custom iterators
* are needed.
* This class may be subclassed within the hadoop codebase
* if custom iterators are needed.
* @param <S> source type
* @param <T> type of returned value
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ public class EntryFileIO {
private static final Logger LOG = LoggerFactory.getLogger(
EntryFileIO.class);

public static final int WRITER_SHUTDOWN_TIMEOUT = 60;

/** Configuration used to load filesystems. */
private final Configuration conf;

Expand Down Expand Up @@ -212,7 +214,7 @@ private QueueEntry(final Actions action) {
* the output stream.
* Other threads can queue the file entry lists from loaded manifests
* for them to be written.
* The these threads will be blocked when the queue capacity is reached.
* These threads will be blocked when the queue capacity is reached.
* This is quite a complex process, with the main troublespots in the code
* being:
* - managing the shutdown
Expand Down Expand Up @@ -240,6 +242,8 @@ public static final class EntryWriter implements Closeable {
*/
private final AtomicBoolean active = new AtomicBoolean(false);

private final int capacity;

/**
* Executor of writes.
*/
Expand Down Expand Up @@ -271,6 +275,7 @@ public static final class EntryWriter implements Closeable {
private EntryWriter(SequenceFile.Writer writer, int capacity) {
checkState(capacity > 0, "invalid queue capacity %s", capacity);
this.writer = requireNonNull(writer);
this.capacity = capacity;
this.queue = new ArrayBlockingQueue<>(capacity);
}

Expand Down Expand Up @@ -330,7 +335,7 @@ public boolean enqueue(List<FileEntry> entries) {
return false;
}
} else {
LOG.debug("Queue inactive; discarding {} entries", entries.size());
LOG.warn("EntryFile write queue inactive; discarding {} entries", entries.size());
return false;
}
}
Expand Down Expand Up @@ -406,7 +411,9 @@ public void close() throws IOException {
// already stopped
return;
}
LOG.debug("Shutting down writer");
LOG.debug("Shutting down writer; entry lists in queue: {}",
capacity - queue.remainingCapacity());

// signal queue closure by queuing a stop option.
// this is added at the end of the list of queued blocks,
// of which are written.
Expand All @@ -417,7 +424,7 @@ public void close() throws IOException {
}
try {
// wait for the op to finish.
int total = FutureIO.awaitFuture(future, 30, TimeUnit.SECONDS);
int total = FutureIO.awaitFuture(future, WRITER_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS);
LOG.debug("Processed {} files", total);
executor.shutdown();
} catch (TimeoutException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
*/
@InterfaceAudience.Private
public final class InternalConstants {

private InternalConstants() {
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ protected CommitJobStage.Result executeStage(
storeSupportsResilientCommit());

// once the manifest has been loaded, a temp file needs to be
// deleted; so track teh value.
// deleted; so track the value.
LoadedManifestData loadedManifestData = null;

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.DirEntry;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.EntryStatus;
import org.apache.hadoop.util.functional.TaskPool;
Expand Down Expand Up @@ -242,6 +243,7 @@ private void deleteDirWithFile(Path dir) throws IOException {
* and, if the operation took place, the list of created dirs.
* Reports progress on invocation.
* @param dirEntry entry
* @throws PathIOException if after multiple attempts, the dest dir couldn't be created.
* @throws IOException failure.
*/
private void createOneDirectory(final DirEntry dirEntry) throws IOException {
Expand Down Expand Up @@ -270,9 +272,17 @@ private void createOneDirectory(final DirEntry dirEntry) throws IOException {
* Try to efficiently and robustly create a directory in a method which is
* expected to be executed in parallel with operations creating
* peer directories.
* A return value of {@link DirMapState#dirWasCreated} or
* {@link DirMapState#dirCreatedOnSecondAttempt} indicates
* this thread did the creation.
* Other outcomes imply it already existed; if the directory
* cannot be created/found then a {@link PathIOException} is thrown.
* The outcome should be added to the {@link #dirMap} to avoid further creation attempts.
* @param dirEntry dir to create
* @return Outcome
* @throws IOException IO Failure.
* @return Outcome of the operation, such as whether the entry was created, found in store.
* It will always be a success outcome of some form.
* @throws PathIOException if after multiple attempts, the dest dir couldn't be created.
* @throws IOException Other IO failure
*/
private DirMapState maybeCreateOneDirectory(DirEntry dirEntry) throws IOException {
final EntryStatus status = dirEntry.getStatus();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ this queue size should not be a limit on manifest load performance.

It can help limit the amount of memory consumed during manifest load during
job commit.
The maximumum number of loaded manifests will be
The maximum number of loaded manifests will be:

```
mapreduce.manifest.committer.writer.queue.capacity + mapreduce.manifest.committer.io.threads
Expand Down

0 comments on commit 8e83fdc

Please sign in to comment.