Skip to content

Commit

Permalink
[NEMO-54] Handle remote data fetch failures due to executor removal (#67
Browse files Browse the repository at this point in the history
)

JIRA: [NEMO-54: Handle remote data fetch failures due to executor removal](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-54)

**Major changes:**
- Catch all Throwables in ParentTaskDataFetcher, wrap it with an IOException, and throw the IOException over to TaskExecutor
- Handle CompleteableFuture failures by passing the throwable to TaskExecutor via iteratorQueue

**Minor changes to note:**
- Introduce ClosableBlockingQueue#closeExceptionally, with the hope to propagate errors all the way up to ParentTaskDataFetcher

**Tests for the changes:**
- ParentTaskDataFetcherTest

**Other comments:**
- I'll add fault-injected integration tests after https://issues.apache.org/jira/browse/NEMO-55

resolves [NEMO-54](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-54)
  • Loading branch information
johnyangk authored and sanha committed Jul 3, 2018
1 parent 5305e6b commit df44252
Show file tree
Hide file tree
Showing 5 changed files with 201 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -139,12 +139,13 @@ void onContextClose() {
@Override
public void onChannelError(@Nullable final Throwable cause) {
setChannelError(cause);
if (cause == null) {
completedFuture.cancel(false);
} else {
completedFuture.completeExceptionally(cause);

if (currentByteBufInputStream != null) {
currentByteBufInputStream.byteBufQueue.closeExceptionally(cause);
}
onContextClose();
byteBufInputStreams.closeExceptionally(cause);
completedFuture.completeExceptionally(cause);
deregister();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public final class ClosableBlockingQueue<T> implements AutoCloseable {

private final Queue<T> queue;
private volatile boolean closed = false;
private volatile Throwable throwable = null;

/**
* Creates a closable blocking queue.
Expand Down Expand Up @@ -74,6 +75,14 @@ public synchronized void close() {
notifyAll();
}

/**
* Mark the input end of this queue as closed.
*/
public synchronized void closeExceptionally(final Throwable throwableToSet) {
this.throwable = throwableToSet;
close();
}

/**
* Retrieves and removes the head of this queue, waiting if necessary.
*
Expand All @@ -82,6 +91,10 @@ public synchronized void close() {
*/
@Nullable
public synchronized T take() throws InterruptedException {
if (throwable != null) {
throw new RuntimeException(throwable);
}

while (queue.isEmpty() && !closed) {
wait();
}
Expand All @@ -97,6 +110,10 @@ public synchronized T take() throws InterruptedException {
*/
@Nullable
public synchronized T peek() throws InterruptedException {
if (throwable != null) {
throw new RuntimeException(throwable);
}

while (queue.isEmpty() && !closed) {
wait();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,7 @@ public void run() {
* @param inputContext {@link ByteInputContext}
*/
public void onInputContext(final ByteInputContext inputContext) {
throw new IllegalStateException("No logic here");
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/
package edu.snu.nemo.runtime.executor.task;

import edu.snu.nemo.common.exception.BlockFetchException;
import edu.snu.nemo.common.ir.vertex.IRVertex;
import edu.snu.nemo.runtime.executor.data.DataUtil;
import edu.snu.nemo.runtime.executor.datatransfer.InputReader;
Expand All @@ -37,7 +36,7 @@ class ParentTaskDataFetcher extends DataFetcher {
private static final Logger LOG = LoggerFactory.getLogger(ParentTaskDataFetcher.class);

private final InputReader readersForParentTask;
private final LinkedBlockingQueue<DataUtil.IteratorWithNumBytes> dataQueue;
private final LinkedBlockingQueue iteratorQueue;

// Non-finals (lazy fetching)
private boolean hasFetchStarted;
Expand All @@ -54,7 +53,8 @@ class ParentTaskDataFetcher extends DataFetcher {
super(dataSource, child, metricMap, readerForParentTask.isSideInputReader(), isToSideInput);
this.readersForParentTask = readerForParentTask;
this.hasFetchStarted = false;
this.dataQueue = new LinkedBlockingQueue<>();
this.currentIteratorIndex = 0;
this.iteratorQueue = new LinkedBlockingQueue<>();
}

private void handleMetric(final DataUtil.IteratorWithNumBytes iterator) {
Expand Down Expand Up @@ -88,15 +88,15 @@ private void fetchInBackground() {
this.expectedNumOfIterators = futures.size();

futures.forEach(compFuture -> compFuture.whenComplete((iterator, exception) -> {
if (exception != null) {
throw new BlockFetchException(exception);
}

try {
dataQueue.put(iterator); // can block here
if (exception != null) {
iteratorQueue.put(exception); // can block here
} else {
iteratorQueue.put(iterator); // can block here
}
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
throw new BlockFetchException(e);
throw new RuntimeException(e); // This shouldn't happen
}
}));
}
Expand All @@ -106,18 +106,16 @@ Object fetchDataElement() throws IOException {
try {
if (!hasFetchStarted) {
fetchInBackground();
hasFetchStarted = true;
this.currentIterator = dataQueue.take();
this.currentIteratorIndex = 1;
advanceIterator();
}

if (this.currentIterator.hasNext()) {
// This iterator has an element available
noElementAtAll = false;
return this.currentIterator.next();
} else {
// This iterator is done, proceed to the next iterator
if (currentIteratorIndex == expectedNumOfIterators) {
// No more iterator left
// Entire fetcher is done
if (noElementAtAll) {
// This shouldn't normally happen, except for cases such as when Beam's VoidCoder is used.
noElementAtAll = false;
Expand All @@ -127,16 +125,39 @@ Object fetchDataElement() throws IOException {
return null;
}
} else {
// Advance to the next one
handleMetric(currentIterator);
// Try the next iterator
this.currentIteratorIndex += 1;
this.currentIterator = dataQueue.take();
advanceIterator();
return fetchDataElement();
}
}
} catch (InterruptedException exception) {
Thread.currentThread().interrupt();
throw new IOException(exception);
} catch (final Throwable e) {
// Any failure is caught and thrown as an IOException, so that the task is retried.
// In particular, we catch unchecked exceptions like RuntimeException thrown by DataUtil.IteratorWithNumBytes
// when remote data fetching fails for whatever reason.
// Note that we rely on unchecked exceptions because the Iterator interface does not provide the standard
// "throw Exception" that the TaskExecutor thread can catch and handle.
throw new IOException(e);
}
}

private void advanceIterator() throws Throwable {
// Take from iteratorQueue
final Object iteratorOrThrowable;
try {
iteratorOrThrowable = iteratorQueue.take();
} catch (InterruptedException e) {
throw e;
}

// Handle iteratorOrThrowable
if (iteratorOrThrowable instanceof Throwable) {
throw (Throwable) iteratorOrThrowable;
} else {
// This iterator is valid. Do advance.
hasFetchStarted = true;
this.currentIterator = (DataUtil.IteratorWithNumBytes) iteratorOrThrowable;
this.currentIteratorIndex++;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/*
* Copyright (C) 2018 Seoul National University
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.snu.nemo.runtime.executor.task;

import edu.snu.nemo.common.ir.vertex.IRVertex;
import edu.snu.nemo.runtime.executor.data.DataUtil;
import edu.snu.nemo.runtime.executor.datatransfer.InputReader;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;

import java.io.IOException;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

/**
* Tests {@link ParentTaskDataFetcher}.
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest({InputReader.class, VertexHarness.class})
public final class ParentTaskDataFetcherTest {

@Test(timeout=5000)
public void testEmpty() throws Exception {
// InputReader
final List<String> dataElements = new ArrayList<>(0); // empty data
final InputReader inputReader = generateInputReader(generateCompletableFuture(dataElements.iterator()));

// Fetcher
final ParentTaskDataFetcher fetcher = createFetcher(inputReader);

// Should return Void.TYPE
assertEquals(Void.TYPE, fetcher.fetchDataElement());
}

@Test(timeout=5000)
public void testNonEmpty() throws Exception {
// InputReader
final String singleData = "Single";
final List<String> dataElements = new ArrayList<>(1);
dataElements.add(singleData); // Single element
final InputReader inputReader = generateInputReader(generateCompletableFuture(dataElements.iterator()));

// Fetcher
final ParentTaskDataFetcher fetcher = createFetcher(inputReader);

// Should return only a single element
assertEquals(singleData, fetcher.fetchDataElement());
assertEquals(null, fetcher.fetchDataElement());
}

@Test(timeout=5000, expected = IOException.class)
public void testErrorWhenRPC() throws Exception {
// Failing future
final CompletableFuture failingFuture = CompletableFuture.runAsync(() -> {
try {
Thread.sleep(2 * 1000); // Block the fetcher for 2 seconds
throw new RuntimeException(); // Fail this future
} catch (InterruptedException e) {
// This shouldn't happen.
// We don't throw anything here, so that IOException does not occur and the test fails
}
}, Executors.newSingleThreadExecutor());
final InputReader inputReader = generateInputReader(failingFuture);

// Fetcher
final ParentTaskDataFetcher fetcher = createFetcher(inputReader);

// Should throw an IOException
fetcher.fetchDataElement(); // checked by 'expected = IOException.class'
assertTrue(failingFuture.isCompletedExceptionally());
}

@Test(timeout=5000, expected = IOException.class)
public void testErrorWhenReadingData() throws Exception {
// Failed iterator
final InputReader inputReader = generateInputReader(generateCompletableFuture(new FailedIterator()));

// Fetcher
final ParentTaskDataFetcher fetcher = createFetcher(inputReader);

// Should throw an IOException
fetcher.fetchDataElement(); // checked by 'expected = IOException.class'
}

private ParentTaskDataFetcher createFetcher(final InputReader readerForParentTask) {
return new ParentTaskDataFetcher(
mock(IRVertex.class),
readerForParentTask, // This is the only argument that affects the behavior of ParentTaskDataFetcher
mock(VertexHarness.class),
new HashMap<>(0),
false);
}

private InputReader generateInputReader(final CompletableFuture completableFuture) {
final InputReader inputReader = mock(InputReader.class);
when(inputReader.read()).thenReturn(Arrays.asList(completableFuture));
return inputReader;
}

private CompletableFuture generateCompletableFuture(final Iterator iterator) {
return CompletableFuture.completedFuture(DataUtil.IteratorWithNumBytes.of(iterator));
}

private class FailedIterator implements Iterator {
@Override
public boolean hasNext() {
throw new RuntimeException("Fail");
}

@Override
public Object next() {
throw new RuntimeException("Fail");
}
}
}

0 comments on commit df44252

Please sign in to comment.