Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -375,15 +375,19 @@ public boolean isFinished() {

@Override
public void requestPartitions() throws IOException, InterruptedException {
// Sanity check
if (numberOfInputChannels != inputChannels.size()) {
throw new IllegalStateException("Bug in input gate setup logic: mismatch between" +
"number of total input channels and the currently set number of input " +
"channels.");
}

synchronized (requestLock) {
if (!requestedPartitionsFlag) {
if (isReleased) {
throw new IllegalStateException("Already released.");
}

// Sanity checks
if (numberOfInputChannels != inputChannels.size()) {
throw new IllegalStateException("Bug in input gate setup logic: mismatch between" +
"number of total input channels and the currently set number of input " +
"channels.");
}

for (InputChannel inputChannel : inputChannels.values()) {
inputChannel.requestSubpartition(consumedSubpartitionIndex);
}
Expand All @@ -404,14 +408,14 @@ public BufferOrEvent getNextBufferOrEvent() throws IOException, InterruptedExcep
return null;
}

if (isReleased) {
throw new IllegalStateException("Already released.");
}

requestPartitions();

InputChannel currentChannel = null;
while (currentChannel == null) {
if (isReleased) {
throw new IllegalStateException("Released");
}

currentChannel = inputChannelsWithData.poll(2, TimeUnit.SECONDS);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -851,8 +851,14 @@ else if (current == ExecutionState.RUNNING) {
// because the canceling may block on user code, we cancel from a separate thread
// we do not reuse the async call handler, because that one may be blocked, in which
// case the canceling could not continue
Runnable canceler = new TaskCanceler(LOG, invokable, executingThread, taskNameWithSubtask,
taskCancellationInterval);
Runnable canceler = new TaskCanceler(
LOG,
invokable,
executingThread,
taskNameWithSubtask,
taskCancellationInterval,
producedPartitions,
inputGates);
Thread cancelThread = new Thread(executingThread.getThreadGroup(), canceler,
"Canceler for " + taskNameWithSubtask);
cancelThread.setDaemon(true);
Expand Down Expand Up @@ -1097,14 +1103,25 @@ private static class TaskCanceler implements Runnable {
private final Thread executer;
private final String taskName;
private final long taskCancellationIntervalMillis;
private final ResultPartition[] producedPartitions;
private final SingleInputGate[] inputGates;

public TaskCanceler(
Logger logger,
AbstractInvokable invokable,
Thread executer,
String taskName,
long cancelationInterval,
ResultPartition[] producedPartitions,
SingleInputGate[] inputGates) {

public TaskCanceler(Logger logger, AbstractInvokable invokable,
Thread executer, String taskName, long cancelationInterval) {
this.logger = logger;
this.invokable = invokable;
this.executer = executer;
this.taskName = taskName;
this.taskCancellationIntervalMillis = cancelationInterval;
this.producedPartitions = producedPartitions;
this.inputGates = inputGates;
}

@Override
Expand All @@ -1119,6 +1136,28 @@ public void run() {
logger.error("Error while canceling the task", t);
}

// Early release of input and output buffer pools. We do this
// in order to unblock async Threads, which produce/consume the
// intermediate streams outside of the main Task Thread.
//
// Don't do this before cancelling the invokable. Otherwise we
// will get misleading errors in the logs.
for (ResultPartition partition : producedPartitions) {
try {
partition.destroyBufferPool();
} catch (Throwable t) {
LOG.error("Failed to release result partition buffer pool.", t);
}
}

for (SingleInputGate inputGate : inputGates) {
try {
inputGate.releaseAllResources();
} catch (Throwable t) {
LOG.error("Failed to release input gate.", t);
}
}

// interrupt the running thread initially
executer.interrupt();
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.io.network.buffer;

import org.apache.flink.core.memory.MemoryType;
import org.junit.Test;

import java.util.concurrent.atomic.AtomicReference;

import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;

public class LocalBufferPoolDestroyTest {

/**
* Tests that a blocking request fails properly if the buffer pool is
* destroyed.
*
* <p>Starts a Thread, which triggers an unsatisfiable blocking buffer
* request. After making sure that the Thread is actually waiting in the
* blocking call, the buffer pool is destroyed and we check whether the
* request Thread threw the expected Exception.
*/
@Test
public void testDestroyWhileBlockingRequest() throws Exception {
AtomicReference<Exception> asyncException = new AtomicReference<>();

NetworkBufferPool networkBufferPool = null;
LocalBufferPool localBufferPool = null;

try {
networkBufferPool = new NetworkBufferPool(1, 4096, MemoryType.HEAP);
localBufferPool = new LocalBufferPool(networkBufferPool, 1);

// Drain buffer pool
assertNotNull(localBufferPool.requestBuffer());
assertNull(localBufferPool.requestBuffer());

// Start request Thread
Thread thread = new Thread(new BufferRequestTask(localBufferPool, asyncException));
thread.start();

// Wait for request
boolean success = false;

for (int i = 0; i < 50; i++) {
StackTraceElement[] stackTrace = thread.getStackTrace();
success = isInBlockingBufferRequest(stackTrace);

if (success) {
break;
} else {
// Retry
Thread.sleep(500);
}
}

// Verify that Thread was in blocking request
assertTrue("Did not trigger blocking buffer request.", success);

// Destroy the buffer pool
localBufferPool.lazyDestroy();

// Wait for Thread to finish
thread.join();

// Verify expected Exception
assertNotNull("Did not throw expected Exception", asyncException.get());
assertTrue(asyncException.get() instanceof IllegalStateException);
} finally {
if (localBufferPool != null) {
localBufferPool.lazyDestroy();
}

if (networkBufferPool != null) {
networkBufferPool.destroyAllBufferPools();
networkBufferPool.destroy();
}
}
}

/**
* Returns whether the stack trace represents a Thread in a blocking buffer
* request.
*
* @param stackTrace Stack trace of the Thread to check
*
* @return Flag indicating whether the Thread is in a blocking buffer
* request or not
*/
private boolean isInBlockingBufferRequest(StackTraceElement[] stackTrace) {
if (stackTrace.length >= 3) {
return stackTrace[0].getMethodName().equals("wait") &&
stackTrace[1].getMethodName().equals("requestBuffer") &&
stackTrace[2].getMethodName().equals("requestBufferBlocking");
} else {
return false;
}
}

/**
* Task triggering a blocking buffer request (the test assumes that no
* buffer is available).
*/
private static class BufferRequestTask implements Runnable {

private final BufferPool bufferPool;
private final AtomicReference<Exception> asyncException;

public BufferRequestTask(BufferPool bufferPool, AtomicReference<Exception> asyncException) {
this.bufferPool = bufferPool;
this.asyncException = asyncException;
}

@Override
public void run() {
try {
String msg = "Test assumption violated: expected no available buffer";
assertNull(msg, bufferPool.requestBuffer());

bufferPool.requestBufferBlocking();
} catch (Exception t) {
asyncException.set(t);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,10 @@
import scala.Tuple2;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicReference;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
Expand Down Expand Up @@ -190,6 +192,100 @@ public void testUpdateChannelBeforeRequest() throws Exception {
any(ResultPartitionID.class), anyInt(), any(BufferProvider.class));
}

/**
* Tests that the release of the input gate is noticed while polling the
* channels for available data.
*/
@Test
public void testReleaseWhilePollingChannel() throws Exception {
final AtomicReference<Exception> asyncException = new AtomicReference<>();

// Setup the input gate with a single channel that does nothing
final SingleInputGate inputGate = new SingleInputGate(
"InputGate",
new JobID(),
new ExecutionAttemptID(),
new IntermediateDataSetID(),
0,
1,
mock(PartitionStateChecker.class));

InputChannel unknown = new UnknownInputChannel(
inputGate,
0,
new ResultPartitionID(),
new ResultPartitionManager(),
new TaskEventDispatcher(),
new LocalConnectionManager(),
new Tuple2<>(0, 0));

inputGate.setInputChannel(unknown.partitionId.getPartitionId(), unknown);

// Start the consumer in a separate Thread
Thread asyncConsumer = new Thread() {
@Override
public void run() {
try {
inputGate.getNextBufferOrEvent();
} catch (Exception e) {
asyncException.set(e);
}
}
};
asyncConsumer.start();

// Wait for blocking queue poll call and release input gate
boolean success = false;
for (int i = 0; i < 50; i++) {
if (asyncConsumer != null && asyncConsumer.isAlive()) {
StackTraceElement[] stackTrace = asyncConsumer.getStackTrace();
success = isInBlockingQueuePoll(stackTrace);
}

if (success) {
break;
} else {
// Retry
Thread.sleep(500);
}
}

// Verify that async consumer is in blocking request
assertTrue("Did not trigger blocking buffer request.", success);

// Release the input gate
inputGate.releaseAllResources();

// Wait for Thread to finish and verify expected Exceptions. If the
// input gate status is not properly checked during requests, this
// call will never return.
asyncConsumer.join();

assertNotNull(asyncException.get());
assertEquals(IllegalStateException.class, asyncException.get().getClass());
}

/**
* Returns whether the stack trace represents a Thread in a blocking queue
* poll call.
*
* @param stackTrace Stack trace of the Thread to check
*
* @return Flag indicating whether the Thread is in a blocking queue poll
* call.
*/
private boolean isInBlockingQueuePoll(StackTraceElement[] stackTrace) {
for (StackTraceElement elem : stackTrace) {
if (elem.getMethodName().equals("poll") &&
elem.getClassName().equals("java.util.concurrent.LinkedBlockingQueue")) {

return true;
}
}

return false;
}

// ---------------------------------------------------------------------------------------------

static void verifyBufferOrEvent(
Expand Down
Loading