Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[NEMO-54] Handle remote data fetch failures due to executor removal #67

Merged
merged 8 commits into from
Jul 3, 2018
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -139,12 +139,13 @@ void onContextClose() {
@Override
public void onChannelError(@Nullable final Throwable cause) {
setChannelError(cause);
if (cause == null) {
completedFuture.cancel(false);
} else {
completedFuture.completeExceptionally(cause);

if (currentByteBufInputStream != null) {
currentByteBufInputStream.byteBufQueue.closeExceptionally(cause);
}
onContextClose();
byteBufInputStreams.closeExceptionally(cause);
completedFuture.completeExceptionally(cause);
deregister();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public final class ClosableBlockingQueue<T> implements AutoCloseable {

private final Queue<T> queue;
private volatile boolean closed = false;
private volatile Throwable throwable = null;

/**
* Creates a closable blocking queue.
Expand Down Expand Up @@ -74,6 +75,14 @@ public synchronized void close() {
notifyAll();
}

/**
* Mark the input end of this queue as closed.
*/
public synchronized void closeExceptionally(final Throwable throwableToSet) {
this.throwable = throwableToSet;
close();
}

/**
* Retrieves and removes the head of this queue, waiting if necessary.
*
Expand All @@ -82,6 +91,10 @@ public synchronized void close() {
*/
@Nullable
public synchronized T take() throws InterruptedException {
if (throwable != null) {
throw new RuntimeException(throwable);
}

while (queue.isEmpty() && !closed) {
wait();
}
Expand All @@ -97,6 +110,10 @@ public synchronized T take() throws InterruptedException {
*/
@Nullable
public synchronized T peek() throws InterruptedException {
if (throwable != null) {
throw new RuntimeException(throwable);
}

while (queue.isEmpty() && !closed) {
wait();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,7 @@ public void run() {
* @param inputContext {@link ByteInputContext}
*/
public void onInputContext(final ByteInputContext inputContext) {
throw new IllegalStateException("No logic here");
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/
package edu.snu.nemo.runtime.executor.task;

import edu.snu.nemo.common.exception.BlockFetchException;
import edu.snu.nemo.common.ir.vertex.IRVertex;
import edu.snu.nemo.runtime.executor.data.DataUtil;
import edu.snu.nemo.runtime.executor.datatransfer.InputReader;
Expand All @@ -37,7 +36,7 @@ class ParentTaskDataFetcher extends DataFetcher {
private static final Logger LOG = LoggerFactory.getLogger(ParentTaskDataFetcher.class);

private final InputReader readersForParentTask;
private final LinkedBlockingQueue<DataUtil.IteratorWithNumBytes> dataQueue;
private final LinkedBlockingQueue iteratorQueue;

// Non-finals (lazy fetching)
private boolean hasFetchStarted;
Expand All @@ -54,7 +53,8 @@ class ParentTaskDataFetcher extends DataFetcher {
super(dataSource, child, metricMap, readerForParentTask.isSideInputReader(), isToSideInput);
this.readersForParentTask = readerForParentTask;
this.hasFetchStarted = false;
this.dataQueue = new LinkedBlockingQueue<>();
this.currentIteratorIndex = 0;
this.iteratorQueue = new LinkedBlockingQueue<>();
}

private void handleMetric(final DataUtil.IteratorWithNumBytes iterator) {
Expand Down Expand Up @@ -88,15 +88,15 @@ private void fetchInBackground() {
this.expectedNumOfIterators = futures.size();

futures.forEach(compFuture -> compFuture.whenComplete((iterator, exception) -> {
if (exception != null) {
throw new BlockFetchException(exception);
}

try {
dataQueue.put(iterator); // can block here
if (exception != null) {
iteratorQueue.put(exception); // can block here
} else {
iteratorQueue.put(iterator); // can block here
}
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
throw new BlockFetchException(e);
throw new RuntimeException(e); // This shouldn't happen
}
}));
}
Expand All @@ -106,18 +106,16 @@ Object fetchDataElement() throws IOException {
try {
if (!hasFetchStarted) {
fetchInBackground();
hasFetchStarted = true;
this.currentIterator = dataQueue.take();
this.currentIteratorIndex = 1;
advanceIterator();
}

if (this.currentIterator.hasNext()) {
// This iterator has an element available
noElementAtAll = false;
return this.currentIterator.next();
} else {
// This iterator is done, proceed to the next iterator
if (currentIteratorIndex == expectedNumOfIterators) {
// No more iterator left
// Entire fetcher is done
if (noElementAtAll) {
// This shouldn't normally happen, except for cases such as when Beam's VoidCoder is used.
noElementAtAll = false;
Expand All @@ -127,16 +125,37 @@ Object fetchDataElement() throws IOException {
return null;
}
} else {
// Advance to the next one
handleMetric(currentIterator);
// Try the next iterator
this.currentIteratorIndex += 1;
this.currentIterator = dataQueue.take();
advanceIterator();
return fetchDataElement();
}
}
} catch (InterruptedException exception) {
Thread.currentThread().interrupt();
throw new IOException(exception);
} catch (Throwable e) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

final Throwable e

// Any failure is caught and thrown as an IOException, so that the task is retried.
// In particular, we catch unchecked exceptions like RuntimeException thrown by DataUtil.IteratorWithNumBytes
// when remote data fetching fails for whatever reason.
throw new IOException(e);
}
}

private void advanceIterator() throws Throwable {
// Take from iteratorQueue
final Object iteratorOrThrowable;
try {
iteratorOrThrowable = iteratorQueue.take();
} catch (InterruptedException e) {
throw e;
}

// Handle iteratorOrThrowable
if (iteratorOrThrowable instanceof Throwable) {
throw (Throwable) iteratorOrThrowable;
} else {
// This iterator is valid. Do advance.
hasFetchStarted = true;
this.currentIterator = (DataUtil.IteratorWithNumBytes) iteratorOrThrowable;
this.currentIteratorIndex++;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/*
* Copyright (C) 2018 Seoul National University
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.snu.nemo.runtime.executor.task;

import edu.snu.nemo.common.ir.vertex.IRVertex;
import edu.snu.nemo.runtime.executor.data.DataUtil;
import edu.snu.nemo.runtime.executor.datatransfer.InputReader;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;

import java.io.IOException;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

/**
* Tests {@link ParentTaskDataFetcher}.
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest({InputReader.class, VertexHarness.class})
public final class ParentTaskDataFetcherTest {

@Test(timeout=5000)
public void testEmpty() throws Exception {
// InputReader
final List<String> dataElements = new ArrayList<>(0); // empty data
final InputReader inputReader = generateInputReader(generateCompletableFuture(dataElements.iterator()));

// Fetcher
final ParentTaskDataFetcher fetcher = createFetcher(inputReader);

// Should return Void.TYPE
assertEquals(Void.TYPE, fetcher.fetchDataElement());
}

@Test(timeout=5000)
public void testNonEmpty() throws Exception {
// InputReader
final String singleData = "Single";
final List<String> dataElements = new ArrayList<>(1);
dataElements.add(singleData); // Single element
final InputReader inputReader = generateInputReader(generateCompletableFuture(dataElements.iterator()));

// Fetcher
final ParentTaskDataFetcher fetcher = createFetcher(inputReader);

// Should return only a single element
assertEquals(singleData, fetcher.fetchDataElement());
assertEquals(null, fetcher.fetchDataElement());
}

@Test(timeout=5000, expected = IOException.class)
public void testErrorWhenRPC() throws Exception {
// Failing future
final CompletableFuture failingFuture = CompletableFuture.runAsync(() -> {
try {
Thread.sleep(2 * 1000); // Block the fetcher for 2 seconds
throw new RuntimeException(); // Fail this future
} catch (InterruptedException e) {
// This shouldn't happen.
// We don't throw anything here, so that IOException does not occur and the test fails
}
}, Executors.newSingleThreadExecutor());
final InputReader inputReader = generateInputReader(failingFuture);

// Fetcher
final ParentTaskDataFetcher fetcher = createFetcher(inputReader);

// Should throw an IOException
fetcher.fetchDataElement(); // checked by 'expected = IOException.class'
assertTrue(failingFuture.isCompletedExceptionally());
}

@Test(timeout=5000, expected = IOException.class)
public void testErrorWhenReadingData() throws Exception {
// Failed iterator
final InputReader inputReader = generateInputReader(generateCompletableFuture(new FailedIterator()));

// Fetcher
final ParentTaskDataFetcher fetcher = createFetcher(inputReader);

// Should throw an IOException
fetcher.fetchDataElement(); // checked by 'expected = IOException.class'
}

private ParentTaskDataFetcher createFetcher(final InputReader readerForParentTask) {
return new ParentTaskDataFetcher(
mock(IRVertex.class),
readerForParentTask, // This is the only argument that affects the behavior of ParentTaskDataFetcher
mock(VertexHarness.class),
new HashMap<>(0),
false);
}

private InputReader generateInputReader(final CompletableFuture completableFuture) {
final InputReader inputReader = mock(InputReader.class);
when(inputReader.read()).thenReturn(Arrays.asList(completableFuture));
return inputReader;
}

private CompletableFuture generateCompletableFuture(final Iterator iterator) {
return CompletableFuture.completedFuture(DataUtil.IteratorWithNumBytes.of(iterator));
}

private class FailedIterator implements Iterator {
@Override
public boolean hasNext() {
throw new RuntimeException("Fail");
}

@Override
public Object next() {
throw new RuntimeException("Fail");
}
}
}