Skip to content

Commit

Permalink
MAPREDUCE-7435. Async queue/write working
Browse files Browse the repository at this point in the history
* TestEntryFileIO extended for ths
* ABFS terasort test happy!

Change-Id: I068861973114d9947f3d22eaf32a6ee3b7ca8fa2
TODO: fault injection on the writes
  • Loading branch information
steveloughran committed Apr 6, 2023
1 parent 26d50d5 commit 2d0dcce
Show file tree
Hide file tree
Showing 2 changed files with 212 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,12 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
Expand Down Expand Up @@ -141,12 +143,12 @@ public EntryWriter launchEntryWriter(SequenceFile.Writer writer, int capacity) {
* @return number of entries written
* @throws IOException write failure.
*/
public static int write(SequenceFile.Writer writer,
public static int write(SequenceFile.Writer writer,
Collection<FileEntry> entries,
boolean close)
throws IOException {
try {
for (FileEntry entry: entries) {
for (FileEntry entry : entries) {
writer.append(NullWritable.get(), entry);
}
writer.flush();
Expand Down Expand Up @@ -200,7 +202,7 @@ public final class EntryWriter implements Closeable {
* count of file entries saved; only updated in one thread
* so volatile.
*/
private volatile int count;
private final AtomicInteger count = new AtomicInteger();

/**
* any failure.
Expand Down Expand Up @@ -230,7 +232,7 @@ public boolean isActive() {
* @return the count
*/
public int getCount() {
return count;
return count.get();
}

/**
Expand All @@ -249,6 +251,7 @@ private void start() {
active.set(true);
executor = HadoopExecutors.newSingleThreadExecutor();
future = executor.submit(this::processor);
LOG.debug("Started entry writer {}", this);
}

/**
Expand All @@ -258,12 +261,19 @@ private void start() {
*/
public boolean enqueue(List<FileEntry> entries) {
if (entries.isEmpty()) {
LOG.debug("ignoring enqueue of empty list");
// exit fast, but return true.
return true;
}
if (active.get()) {
queue.add(entries);
return false;
try {
queue.put(entries);
LOG.debug("Queued {}", entries.size());
return true;
} catch (InterruptedException e) {
Thread.interrupted();
return false;
}
} else {
LOG.debug("Queue inactive; discarding {} entries", entries.size());
return false;
Expand All @@ -276,22 +286,27 @@ public boolean enqueue(List<FileEntry> entries) {
* @throws UncheckedIOException on write failure
*/
private int processor() {
int count = 0;
try {
while (!stop.get()) {
final List<FileEntry> entries = queue.take();
if (entries.isEmpty()) {
// exit list reached.
LOG.debug("List termination initiated");
stop.set(true);
} else {
entries.forEach(this::append);
LOG.debug("Adding block of {} entries", entries.size());
for (FileEntry entry : entries) {
append(entry);
}
}
}
} catch (InterruptedException e) {
// assume we are stopped here
LOG.debug("interrupted", e);
active.set(false);
stop.set(true);
}
return count;
return count.get();
}

/**
Expand All @@ -300,14 +315,15 @@ private int processor() {
* @throws UncheckedIOException on write failure
*/
private void append(FileEntry entry) {
if (failure != null) {
try {
writer.append(NullWritable.get(), entry);
count++;
} catch (IOException e) {
failure = e;
throw new UncheckedIOException(e);
}
try {
writer.append(NullWritable.get(), entry);

final int c = count.incrementAndGet();
LOG.trace("Added entry #{}: {}", c, entry);
} catch (IOException e) {
LOG.debug("Write failure", e);
failure = e;
throw new UncheckedIOException(e);
}
}

Expand All @@ -329,7 +345,11 @@ public void close() throws IOException {
}
LOG.debug("Shutting down writer");
// signal queue closure by queue an empty list
queue.add(new ArrayList<>());
try {
queue.put(new ArrayList<>());
} catch (InterruptedException e) {
Thread.interrupted();
}
try {
// wait for the op to finish.
int total = FutureIO.awaitFuture(future, 1, TimeUnit.MINUTES);
Expand All @@ -353,20 +373,35 @@ public void maybeRaiseWriteException() throws IOException {
throw failure;
}
}

@Override
public String toString() {
return "EntryWriter{" +
"stop=" + stop.get() +
", active=" + active.get() +
", count=" + count.get() +
", queue depth=" + queue.size() +
", failure=" + failure +
'}';
}
}

/**
* Iterator to retrieve file entries from the sequence file.
* Closeable; it will close automatically when the last element is read.
* No thread safety.
*/
private final class EntryIterator implements RemoteIterator<FileEntry>, Closeable {
@VisibleForTesting
final class EntryIterator implements RemoteIterator<FileEntry>, Closeable {

private final SequenceFile.Reader reader;

private FileEntry fetched;

private boolean closed;

private int count;

private EntryIterator(final SequenceFile.Reader reader) {
this.reader = requireNonNull(reader);
}
Expand All @@ -379,6 +414,15 @@ public void close() throws IOException {
}
}

@Override
public String toString() {
return "EntryIterator{" +
"closed=" + closed +
", count=" + count +
", fetched=" + fetched +
'}';
}

@Override
public boolean hasNext() throws IOException {
return fetched != null || fetchNext();
Expand All @@ -388,6 +432,7 @@ private boolean fetchNext() throws IOException {
FileEntry readBack = new FileEntry();
if (reader.next(NullWritable.get(), readBack)) {
fetched = readBack;
count++;
return true;
} else {
fetched = null;
Expand All @@ -406,6 +451,17 @@ public FileEntry next() throws IOException {
return r;
}

/**
* Is the stream closed.
* @return true if closed.
*/
public boolean isClosed() {
return closed;
}

int getCount() {
return count;
}
}

}

0 comments on commit 2d0dcce

Please sign in to comment.