From ee70fa7f6730ee6415e5cd1dcfcc7ebcb4fb8be1 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Fri, 20 Feb 2015 12:34:19 +0100 Subject: [PATCH 1/5] [FLINK-1606] [tests] Fixes JobManagerFailsITCase for Akka 2.2.1 by setting gate-invalid-address-for = 5s and quarantine-systems-for = off --- .../java/org/apache/flink/configuration/ConfigConstants.java | 2 +- .../main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala | 3 +++ .../api/scala/runtime/jobmanager/JobManagerFailsITCase.scala | 4 ++-- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index 767648aaf0ab3..e5da77f6793c4 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -563,7 +563,7 @@ public final class ConfigConstants { public static int DEFAULT_AKKA_DISPATCHER_THROUGHPUT = 15; - public static boolean DEFAULT_AKKA_LOG_LIFECYCLE_EVENTS = false; + public static boolean DEFAULT_AKKA_LOG_LIFECYCLE_EVENTS = true; public static String DEFAULT_AKKA_FRAMESIZE = "10485760b"; diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala index f6062cae1948e..7aa842b06bc83 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala @@ -232,6 +232,9 @@ object AkkaUtils { | } | | remote { + | quarantine-systems-for = off + | gate-invalid-addresses-for = 5 s + | | startup-timeout = $startupTimeout | | transport-failure-detector{ diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala index 416470f90bbd5..6835dfa6cccef 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala @@ -49,7 +49,7 @@ with WordSpecLike with Matchers with BeforeAndAfterAll { "A TaskManager" should { "detect a lost connection to the JobManager and try to reconnect to it" in { - val num_slots = 11 + val num_slots = 13 val cluster = startDeathwatchCluster(num_slots, 1) val tm = cluster.getTaskManagers(0) @@ -81,7 +81,7 @@ with WordSpecLike with Matchers with BeforeAndAfterAll { } "go into a clean state in case of a JobManager failure" in { - val num_slots = 20 + val num_slots = 36 val sender = new AbstractJobVertex("BlockingSender") sender.setParallelism(num_slots) From 9659d392391585f5ff84e67fd5a4a79f3d110fba Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Fri, 20 Feb 2015 17:02:04 +0100 Subject: [PATCH 2/5] [FLINK-1607] [tests] Fixes SimpleRecoveryITCase by rebalancing the data sources so that all mappers receive at least one element --- .../flink/configuration/ConfigConstants.java | 2 +- .../flink/runtime/deployment/PartitionInfo.java | 7 +++++++ .../io/network/api/reader/BufferReader.java | 1 - .../partition/IntermediateResultPartition.java | 15 ++++++++++----- .../IntermediateResultPartitionManager.java | 6 +++++- .../partition/consumer/RemoteInputChannel.java | 4 ++-- .../flink/runtime/taskmanager/TaskManager.scala | 1 - .../flink/test/recovery/SimpleRecoveryITCase.java | 4 ++++ tools/log4j-travis.properties | 2 +- 9 files changed, 30 insertions(+), 12 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index e5da77f6793c4..767648aaf0ab3 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -563,7 +563,7 @@ public final class ConfigConstants { public static int DEFAULT_AKKA_DISPATCHER_THROUGHPUT = 15; - public static boolean DEFAULT_AKKA_LOG_LIFECYCLE_EVENTS = true; + public static boolean DEFAULT_AKKA_LOG_LIFECYCLE_EVENTS = false; public static String DEFAULT_AKKA_FRAMESIZE = "10485760b"; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartitionInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartitionInfo.java index 333340acf43ad..2a0e8b1d362f0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartitionInfo.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartitionInfo.java @@ -157,4 +157,11 @@ public static PartitionInfo[] fromEdges(ExecutionEdge[] edges, SimpleSlot consum return partitions; } + + @Override + public String toString() { + return String.format("PartitionInfo(PartitionID: %s, ProducerID: %s, " + + "ProducerLocation: %s, ProducerAddress: %s)", partitionId, producerExecutionId, + producerLocation, producerAddress); + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java index fca27faede1b5..7869868b1c81f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java @@ -208,7 +208,6 @@ else if (partitionInfo.getProducerLocation() == PartitionLocation.LOCAL) { inputChannels.put(partitionId, newChannel); - newChannel.requestIntermediateResultPartition(queueToRequest); for (TaskEvent event : pendingEvents) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/IntermediateResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/IntermediateResultPartition.java index 71af7a64b3d10..e6cd140deaa6b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/IntermediateResultPartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/IntermediateResultPartition.java @@ -186,6 +186,9 @@ public void finish() throws IOException { } public void releaseAllResources() throws IOException { + if(LOG.isDebugEnabled()) { + LOG.debug("Release all resources of {}.", this); + } synchronized (queues) { if (!isReleased) { try { @@ -229,11 +232,6 @@ public IntermediateResultPartitionQueueIterator getQueueIterator(int queueIndex, // ------------------------------------------------------------------------ - @Override - public String toString() { - return "Intermediate result partition " + partitionId + " [num queues: " + queues.length + ", " + (isFinished ? "finished" : "not finished") + "]"; - } - private void checkInProducePhase() { checkState(!isReleased, "Partition has already been discarded."); checkState(!isFinished, "Partition has already been finished."); @@ -309,4 +307,11 @@ public static IntermediateResultPartition create(RuntimeEnvironment environment, return new IntermediateResultPartition(environment, partitionIndex, jobId, executionId, partitionId, partitionType, partitionQueues, networkEnvironment); } + + @Override + public String toString() { + return String.format("IntermediateResultPartition(JobID: %s, ExecutionID: %s, " + + "PartitionID: %s, PartitionType: %s, [num queues: %d, (isFinished: %b)", + jobId, producerExecutionId, partitionId, partitionType, queues.length, isFinished); + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/IntermediateResultPartitionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/IntermediateResultPartitionManager.java index 55a741be1cc02..46c690e7ec45e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/IntermediateResultPartitionManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/IntermediateResultPartitionManager.java @@ -41,11 +41,15 @@ public class IntermediateResultPartitionManager implements IntermediateResultPar private static final Logger LOG = LoggerFactory.getLogger(IntermediateResultPartitionManager.class); - public final Table partitions = HashBasedTable.create(); + private final Table partitions = HashBasedTable.create(); private boolean isShutdown; public void registerIntermediateResultPartition(IntermediateResultPartition partition) throws IOException { + if(LOG.isDebugEnabled()){ + LOG.debug("Register intermediate result partition {}.", partition); + } synchronized (partitions) { if (isShutdown) { throw new IOException("Intermediate result partition manager has already been shut down."); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java index 616a8a5decd3e..39e8ee74332af 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java @@ -215,8 +215,8 @@ private void checkIoError() throws IOException { IOException error = ioError.get(); if (error != null) { - throw new IOException(String.format("%s at remote input channel of task '%s': %s].", - error.getClass().getName(), reader.getTaskNameWithSubtasks(), error.getMessage())); + throw new IOException(String.format("%s at remote input channel of task '%s'.", + error.getClass().getName(), reader.getTaskNameWithSubtasks()), error); } } diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index 9d157239fe259..3265007a073ec 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -385,7 +385,6 @@ import scala.collection.JavaConverters._ startRegisteringTask = System.currentTimeMillis() } - manager.registerTask(jobID, executionID, tdd.getRequiredJarFiles) // triggers the download of all missing jar files from the job manager manager.registerTask(jobID, executionID, tdd.getRequiredJarFiles) diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java index 911edb3410944..01367f6f5ddf1 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java @@ -81,6 +81,7 @@ public void testFailedRunThenSuccessfulRun() { env.setNumberOfExecutionRetries(0); env.generateSequence(1, 10) + .rebalance() .map(new FailOnceMapper()) .reduce(new ReduceFunction() { @Override @@ -108,6 +109,7 @@ public Long reduce(Long value1, Long value2) { env.setNumberOfExecutionRetries(0); env.generateSequence(1, 10) + .rebalance() .map(new FailOnceMapper()) .reduce(new ReduceFunction() { @Override @@ -156,6 +158,7 @@ public void testRestart() { env.setNumberOfExecutionRetries(1); env.generateSequence(1, 10) + .rebalance() .map(new FailOnceMapper()) .reduce(new ReduceFunction() { @Override @@ -202,6 +205,7 @@ public void testRestartMultipleTimes() { env.setNumberOfExecutionRetries(3); env.generateSequence(1, 10) + .rebalance() .map(new FailOnceMapper()) .reduce(new ReduceFunction() { @Override diff --git a/tools/log4j-travis.properties b/tools/log4j-travis.properties index f69a6736f8ffb..d92fcba692a4a 100644 --- a/tools/log4j-travis.properties +++ b/tools/log4j-travis.properties @@ -16,7 +16,7 @@ # limitations under the License. ################################################################################ -log4j.rootLogger=INFO, file +log4j.rootLogger=DEBUG, file # ----------------------------------------------------------------------------- # Console (use 'console') From 534ebd39e8b0757f5a67f98b43f8d79d3ed02799 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Mon, 23 Feb 2015 13:49:05 +0100 Subject: [PATCH 3/5] [FLINK-1604] [runtime] Fixes livelock in PartitionRequestClientFactory.createPartitionRequestClient Replaces recursive concurrent modification resolution by while loop Turns off stdout-logging of Akka. Sends proper exceptions in ErrorResponse. Proper stream closing --- ...PartitionConsumerDeploymentDescriptor.java | 8 ++ .../PartitionDeploymentDescriptor.java | 6 + .../runtime/deployment/PartitionInfo.java | 13 +- .../deployment/TaskDeploymentDescriptor.java | 23 ++++ .../executiongraph/ExecutionVertex.java | 2 +- .../runtime/io/network/ConnectionManager.java | 6 + .../io/network/LocalConnectionManager.java | 3 + .../io/network/api/reader/BufferReader.java | 14 +++ .../network/netty/NettyConnectionManager.java | 5 + .../io/network/netty/NettyMessage.java | 59 ++++++--- .../network/netty/PartitionRequestClient.java | 4 + .../netty/PartitionRequestClientFactory.java | 114 +++++++++++------- .../netty/PartitionRequestServerHandler.java | 4 + .../IntermediateResultPartition.java | 7 +- .../IntermediateResultPartitionManager.java | 14 ++- .../partition/consumer/LocalInputChannel.java | 3 +- .../consumer/RemoteInputChannel.java | 8 +- .../flink/runtime/taskmanager/Task.java | 16 ++- .../AtomicDisposableReferenceCounter.java | 12 +- .../flink/runtime/ActorLogMessages.scala | 6 +- .../apache/flink/runtime/akka/AkkaUtils.scala | 2 +- .../runtime/taskmanager/TaskManager.scala | 10 +- .../runtime/testingUtils/TestingUtils.scala | 2 +- .../flink/yarn/YARNSessionFIFOITCase.java | 2 + 24 files changed, 259 insertions(+), 84 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartitionConsumerDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartitionConsumerDeploymentDescriptor.java index 5b7c93a3c2c74..7300da4ea2c08 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartitionConsumerDeploymentDescriptor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartitionConsumerDeploymentDescriptor.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.io.Serializable; +import java.util.Arrays; /** * A partition consumer deployment descriptor combines information of all partitions, which are @@ -91,4 +92,11 @@ public void read(DataInputView in) throws IOException { this.queueIndex = in.readInt(); } + + @Override + public String toString() { + return String.format("PartitionConsumerDeploymentDescriptor(ResultID: %s, " + + "Queue index: %d, Partitions: %s)", resultId, queueIndex, + Arrays.toString(partitions)); + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartitionDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartitionDeploymentDescriptor.java index 148f8d4c5d910..37651c95d9baf 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartitionDeploymentDescriptor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartitionDeploymentDescriptor.java @@ -113,4 +113,10 @@ public static PartitionDeploymentDescriptor fromIntermediateResultPartition(Inte return new PartitionDeploymentDescriptor(partition.getIntermediateResult().getId(), partitionId, partition.getIntermediateResult().getResultType(), numberOfQueues); } + + @Override + public String toString() { + return String.format("PartitionDeploymentDescriptor(ResultID: %s, partitionID: %s, " + + "Partition type: %s)", resultId, partitionId, partitionType); + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartitionInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartitionInfo.java index 2a0e8b1d362f0..1c6b53c80fc80 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartitionInfo.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartitionInfo.java @@ -29,6 +29,8 @@ import org.apache.flink.runtime.instance.SimpleSlot; import org.apache.flink.runtime.io.network.RemoteAddress; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.Serializable; @@ -41,6 +43,8 @@ */ public class PartitionInfo implements IOReadableWritable, Serializable { + private static Logger LOG = LoggerFactory.getLogger(PartitionInfo.class); + public enum PartitionLocation { LOCAL, REMOTE, UNKNOWN } @@ -143,7 +147,14 @@ public static PartitionInfo fromEdge(ExecutionEdge edge, SimpleSlot consumerSlot } } - return new PartitionInfo(partitionId, producerExecutionId, producerLocation, producerAddress); + PartitionInfo partitionInfo = new PartitionInfo(partitionId, producerExecutionId, + producerLocation, producerAddress); + + if (LOG.isDebugEnabled()) { + LOG.debug("Create partition info {}.", partitionInfo); + } + + return partitionInfo; } public static PartitionInfo[] fromEdges(ExecutionEdge[] edges, SimpleSlot consumerSlot) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java index 2432beaf2718c..20204d5a58515 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java @@ -201,4 +201,27 @@ public List getConsumedPartitions() { public List getRequiredJarFiles() { return requiredJarFiles; } + + @Override + public String toString() { + final StringBuilder pddBuilder = new StringBuilder(""); + final StringBuilder pcddBuilder = new StringBuilder(""); + + for(PartitionDeploymentDescriptor pdd: producedPartitions) { + pddBuilder.append(pdd); + } + + for(PartitionConsumerDeploymentDescriptor pcdd: consumedPartitions) { + pcddBuilder.append(pcdd); + } + + final String strProducedPartitions = pddBuilder.toString(); + final String strConsumedPartitions = pcddBuilder.toString(); + + return String.format("TaskDeploymentDescriptor(JobID: %s, JobVertexID: %s, " + + "ExecutionID: %s, Task name: %s, (%d/%d), Invokable: %s, " + + "Produced partitions: %s, Consumed partitions: %s", jobID, vertexID, executionId, + taskName, indexInSubtaskGroup, numberOfSubtasks, invokableClassName, + strProducedPartitions, strConsumedPartitions); + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java index 58515ffb78de8..8d2e7213a8a5a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java @@ -376,7 +376,7 @@ public Iterable getPreferredLocations() { public void resetForNewExecution() { if (LOG.isDebugEnabled()) { - LOG.debug("Resetting exection vertex {} for new execution.", getSimpleName()); + LOG.debug("Resetting execution vertex {} for new execution.", getSimpleName()); } synchronized (priorExecutions) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java index 4a5536bd2d384..d478e0f175aae 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java @@ -36,6 +36,12 @@ public interface ConnectionManager { */ PartitionRequestClient createPartitionRequestClient(RemoteAddress remoteAddress) throws IOException; + /** + * Closes opened ChannelConnections in case of a resource release + * @param remoteAddress + */ + void closeOpenChannelConnections(RemoteAddress remoteAddress); + int getNumberOfActiveConnections(); void shutdown() throws IOException; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java index 894db350c8e6f..447f6e65ddd7b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java @@ -38,6 +38,9 @@ public PartitionRequestClient createPartitionRequestClient(RemoteAddress remoteA return null; } + @Override + public void closeOpenChannelConnections(RemoteAddress remoteAddress) {} + @Override public int getNumberOfActiveConnections() { return 0; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java index 7869868b1c81f..d21a623418867 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java @@ -479,6 +479,20 @@ public static BufferReader create(RuntimeEnvironment runtimeEnvironment, Network final PartitionLocation producerLocation = partition.getProducerLocation(); + if (LOG.isDebugEnabled()) { + switch(producerLocation) { + case LOCAL: + LOG.debug("Create LocalInputChannel for {}.", partition); + break; + case REMOTE: + LOG.debug("Create RemoteInputChannel for {}.", partition); + break; + case UNKNOWN: + LOG.debug("Create UnknownInputChannel for {}.", partition); + break; + } + } + switch (producerLocation) { case LOCAL: inputChannels[channelIndex] = new LocalInputChannel(channelIndex, producerExecutionId, partitionId, reader); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java index bbb303bdd97ff..5d03c1580d05e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java @@ -53,6 +53,11 @@ public PartitionRequestClient createPartitionRequestClient(RemoteAddress remoteA return partitionRequestClientFactory.createPartitionRequestClient(remoteAddress); } + @Override + public void closeOpenChannelConnections(RemoteAddress remoteAddress) { + partitionRequestClientFactory.closeOpenChannelConnections(remoteAddress); + } + @Override public int getNumberOfActiveConnections() { return partitionRequestClientFactory.getNumberOfActiveClients(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java index 39a03ac406b9b..9783f112da4d2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java @@ -28,6 +28,8 @@ import io.netty.channel.ChannelPromise; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.MessageToMessageDecoder; +import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream; +import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.runtime.event.task.TaskEvent; @@ -36,10 +38,10 @@ import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; -import org.apache.flink.util.InstantiationUtil; -import org.apache.flink.util.StringUtils; import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; import java.nio.ByteBuffer; import java.util.List; @@ -273,26 +275,26 @@ boolean isFatalError() { ByteBuf write(ByteBufAllocator allocator) throws IOException { ByteBuf result = null; + ObjectOutputStream oos = null; + try { result = allocateBuffer(allocator, ID); DataOutputView outputView = new ByteBufDataOutputView(result); - StringUtils.writeNullableString(error.getClass().getName(), outputView); - StringUtils.writeNullableString(error.getMessage(), outputView); + oos = new ObjectOutputStream(new DataOutputViewStream(outputView)); + + oos.writeObject(error); if (receiverId != null) { result.writeBoolean(true); receiverId.writeTo(result); - } - else { + } else { result.writeBoolean(false); } // Update frame length... result.setInt(0, result.readableBytes()); - - return result; } catch (Throwable t) { if (result != null) { @@ -300,26 +302,39 @@ ByteBuf write(ByteBufAllocator allocator) throws IOException { } throw new IOException(t); + } finally { + if(oos != null) { + oos.close(); + } } + + return result; } @Override void readFrom(ByteBuf buffer) throws Exception { DataInputView inputView = new ByteBufDataInputView(buffer); + ObjectInputStream ois = null; - String errorClassName = StringUtils.readNullableString(inputView); - Class errorClazz = (Class) getClass().getClassLoader().loadClass(errorClassName); + try { + ois = new ObjectInputStream(new DataInputViewStream(inputView)); - String errorMsg = StringUtils.readNullableString(inputView); - if (errorMsg != null) { - error = errorClazz.getConstructor(String.class).newInstance(errorMsg); - } - else { - error = InstantiationUtil.instantiate(errorClazz); - } + Object obj = ois.readObject(); - if (buffer.readBoolean()) { - receiverId = InputChannelID.fromByteBuf(buffer); + if (!(obj instanceof Throwable)) { + throw new ClassCastException("Read object expected to be of type Throwable, " + + "actual type is " + obj.getClass() + "."); + } else { + error = (Throwable) obj; + + if (buffer.readBoolean()) { + receiverId = InputChannelID.fromByteBuf(buffer); + } + } + } finally { + if (ois != null) { + ois.close(); + } } } } @@ -380,6 +395,12 @@ public void readFrom(ByteBuf buffer) { queueIndex = buffer.readInt(); receiverId = InputChannelID.fromByteBuf(buffer); } + + @Override + public String toString() { + return String.format("PartitionRequest(ProducerID: %s, PartitionID: %s)", + producerExecutionId, partitionId); + } } static class TaskEventRequest extends NettyMessage { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java index 25021e2fb4fb5..f26f15c200f88 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java @@ -60,6 +60,10 @@ public class PartitionRequestClient { this.clientFactory = checkNotNull(clientFactory); } + boolean disposeIfNotUsed() { + return closeReferenceCounter.disposeIfNotUsed(); + } + /** * Increments the reference counter. *

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java index dd8600429ce99..d64548d923ce7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java @@ -49,56 +49,65 @@ class PartitionRequestClientFactory { * creates a {@link PartitionRequestClient} instance for this connection. */ PartitionRequestClient createPartitionRequestClient(RemoteAddress remoteAddress) throws IOException { - final Object entry = clients.get(remoteAddress); + Object entry; + PartitionRequestClient client = null; - final PartitionRequestClient client; - if (entry != null) { - // Existing channel or connecting channel - if (entry instanceof PartitionRequestClient) { - client = (PartitionRequestClient) entry; - } - else { - ConnectingChannel future = (ConnectingChannel) entry; - client = future.waitForChannel(); - } - } - else { - // No channel yet. Create one, but watch out for a race. - // We create a "connecting future" and atomically add it to the map. - // Only the thread that really added it establishes the channel. - // The others need to wait on that original establisher's future. - ConnectingChannel connectingChannel = new ConnectingChannel(remoteAddress, this); - Object old = clients.putIfAbsent(remoteAddress, connectingChannel); + while(client == null) { + entry = clients.get(remoteAddress); - if (old == null) { - nettyClient.connect(remoteAddress.getAddress()).addListener(connectingChannel); + if (entry != null) { + // Existing channel or connecting channel + if (entry instanceof PartitionRequestClient) { + client = (PartitionRequestClient) entry; + } else { + ConnectingChannel future = (ConnectingChannel) entry; + client = future.waitForChannel(); - client = connectingChannel.waitForChannel(); - - Object previous = clients.put(remoteAddress, client); - - if (connectingChannel != previous) { - throw new IOException("Race condition while establishing channel connection."); + clients.replace(remoteAddress, future, client); + } + } else { + // No channel yet. Create one, but watch out for a race. + // We create a "connecting future" and atomically add it to the map. + // Only the thread that really added it establishes the channel. + // The others need to wait on that original establisher's future. + ConnectingChannel connectingChannel = new ConnectingChannel(remoteAddress, this); + Object old = clients.putIfAbsent(remoteAddress, connectingChannel); + + if (old == null) { + nettyClient.connect(remoteAddress.getAddress()).addListener(connectingChannel); + + client = connectingChannel.waitForChannel(); + + clients.replace(remoteAddress, connectingChannel, client); + } else if (old instanceof ConnectingChannel) { + client = ((ConnectingChannel) old).waitForChannel(); + + clients.replace(remoteAddress, old, client); + } else { + client = (PartitionRequestClient) old; } } - else if (old instanceof ConnectingChannel) { - client = ((ConnectingChannel) old).waitForChannel(); - } - else { - client = (PartitionRequestClient) old; + + // Make sure to increment the reference count before handing a client + // out to ensure correct bookkeeping for channel closing. + if(!client.incrementReferenceCounter()){ + destroyPartitionRequestClient(remoteAddress, client); + client = null; } } - // Make sure to increment the reference count before handing a client - // out to ensure correct bookkeeping for channel closing. - if (client.incrementReferenceCounter()) { - return client; - } - else { - // There was a race with a close, try again. - destroyPartitionRequestClient(remoteAddress, client); + return client; + } + + public void closeOpenChannelConnections(RemoteAddress remoteAddress) { + Object entry = clients.get(remoteAddress); + + if(entry instanceof ConnectingChannel) { + ConnectingChannel channel = (ConnectingChannel) entry; - return createPartitionRequestClient(remoteAddress); + if (channel.dispose()) { + clients.remove(remoteAddress, channel); + } } } @@ -121,17 +130,40 @@ private static final class ConnectingChannel implements ChannelFutureListener { private final PartitionRequestClientFactory clientFactory; + private boolean disposeRequestClient = false; + public ConnectingChannel(RemoteAddress remoteAddress, PartitionRequestClientFactory clientFactory) { this.remoteAddress = remoteAddress; this.clientFactory = clientFactory; } + private boolean dispose() { + boolean result; + synchronized (connectLock) { + if (partitionRequestClient != null) { + result = partitionRequestClient.disposeIfNotUsed(); + } else { + disposeRequestClient = true; + result = true; + } + + connectLock.notifyAll(); + } + + return result; + } + private void handInChannel(Channel channel) { synchronized (connectLock) { PartitionRequestClientHandler requestHandler = (PartitionRequestClientHandler) channel.pipeline().get(PartitionRequestProtocol.CLIENT_REQUEST_HANDLER_NAME); partitionRequestClient = new PartitionRequestClient(channel, requestHandler, remoteAddress, clientFactory); + + if (disposeRequestClient) { + partitionRequestClient.disposeIfNotUsed(); + } + connectLock.notifyAll(); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java index 93f719241f75d..5c474ac44a550 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java @@ -60,6 +60,10 @@ protected void channelRead0(ChannelHandlerContext ctx, NettyMessage msg) throws if (msgClazz == PartitionRequest.class) { PartitionRequest request = (PartitionRequest) msg; + if (LOG.isDebugEnabled()) { + LOG.debug("Read channel on {}: {}.",ctx.channel().localAddress(), request); + } + IntermediateResultPartitionQueueIterator queueIterator = partitionProvider.getIntermediateResultPartitionIterator( request.producerExecutionId, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/IntermediateResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/IntermediateResultPartition.java index e6cd140deaa6b..61114851ed8b2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/IntermediateResultPartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/IntermediateResultPartition.java @@ -186,10 +186,11 @@ public void finish() throws IOException { } public void releaseAllResources() throws IOException { - if(LOG.isDebugEnabled()) { - LOG.debug("Release all resources of {}.", this); - } synchronized (queues) { + if(LOG.isDebugEnabled()) { + LOG.debug("Release all resources of {}.", this); + } + if (!isReleased) { try { for (IntermediateResultPartitionQueue queue : queues) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/IntermediateResultPartitionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/IntermediateResultPartitionManager.java index 46c690e7ec45e..26836490d1b78 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/IntermediateResultPartitionManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/IntermediateResultPartitionManager.java @@ -31,6 +31,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; /** @@ -47,10 +48,11 @@ public class IntermediateResultPartitionManager implements IntermediateResultPar private boolean isShutdown; public void registerIntermediateResultPartition(IntermediateResultPartition partition) throws IOException { - if(LOG.isDebugEnabled()){ - LOG.debug("Register intermediate result partition {}.", partition); - } synchronized (partitions) { + if(LOG.isDebugEnabled()){ + LOG.debug("Register intermediate result partition {}.", partition); + } + if (isShutdown) { throw new IOException("Intermediate result partition manager has already been shut down."); } @@ -123,6 +125,12 @@ public IntermediateResultPartitionQueueIterator getIntermediateResultPartitionIt if (partition == null) { if (!partitions.containsRow(producerExecutionId)) { + if(LOG.isDebugEnabled()) { + LOG.debug("Could not find producer execution ID {}. Registered producer" + + " execution IDs {}.", producerExecutionId, + Arrays.toString(partitions.rowKeySet().toArray())); + } + throw new IllegalQueueIteratorRequestException("Unknown producer execution ID " + producerExecutionId + "."); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java index 150aaea7dc9c0..86415ab0180e9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java @@ -58,7 +58,8 @@ public LocalInputChannel(int channelIndex, ExecutionAttemptID producerExecutionI public void requestIntermediateResultPartition(int queueIndex) throws IOException { if (queueIterator == null) { if (LOG.isDebugEnabled()) { - LOG.debug("Requesting queue {} from LOCAL partition {}.", partitionId, queueIndex); + LOG.debug("Requesting LOCAL queue {} from partition {} produced by {}.", queueIndex, partitionId, + producerExecutionId); } queueIterator = reader.getIntermediateResultPartitionProvider() diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java index 39e8ee74332af..4a3c00b79aeb2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java @@ -64,7 +64,6 @@ public RemoteInputChannel( IntermediateResultPartitionID partitionId, BufferReader reader, RemoteAddress producerAddress) { - super(channelIndex, producerExecutionId, partitionId, reader); /** @@ -83,7 +82,8 @@ public RemoteInputChannel( public void requestIntermediateResultPartition(int queueIndex) throws IOException { if (partitionRequestClient == null) { if (LOG.isDebugEnabled()) { - LOG.debug("Requesting queue {} from REMOTE partition {}.", partitionId, queueIndex); + LOG.debug("Requesting REMOTE queue {} from partition {} produced by {}.", queueIndex, partitionId, + producerExecutionId); } partitionRequestClient = reader.getConnectionManager().createPartitionRequestClient(producerAddress); @@ -149,13 +149,15 @@ public void releaseAllResources() throws IOException { if (partitionRequestClient != null) { partitionRequestClient.close(this); + } else { + reader.getConnectionManager().closeOpenChannelConnections(producerAddress); } } } @Override public String toString() { - return "REMOTE " + id + " " + super.toString(); + return "REMOTE " + id + " " + producerAddress + " " + super.toString(); } // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java index 715515ed69d90..9d756776c49c6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java @@ -144,11 +144,20 @@ public boolean isCanceledOrFailed() { } public String getTaskName() { - return taskName; + if (LOG.isDebugEnabled()) { + return taskName + " (" + executionId + ")"; + } else { + return taskName; + } } public String getTaskNameWithSubtasks() { - return this.taskName + " (" + (this.subtaskIndex + 1) + "/" + this.numberOfSubtasks + ")"; + if (LOG.isDebugEnabled()) { + return this.taskName + " (" + (this.subtaskIndex + 1) + "/" + this.numberOfSubtasks + + ") (" + executionId + ")"; + } else { + return this.taskName + " (" + (this.subtaskIndex + 1) + "/" + this.numberOfSubtasks + ")"; + } } // ---------------------------------------------------------------------------------------------------------------- @@ -332,7 +341,8 @@ public void unregisterMemoryManager(MemoryManager memoryManager) { protected void notifyExecutionStateChange(ExecutionState executionState, Throwable optionalError) { - LOG.info("Update execution state to " + executionState); + LOG.info("Update execution state of {} ({}) to {}.", this.getTaskName(), + this.getExecutionId(), executionState); taskManager.tell(new JobManagerMessages.UpdateTaskExecutionState( new TaskExecutionState(jobId, executionId, executionState, optionalError)), ActorRef.noSender()); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/AtomicDisposableReferenceCounter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/AtomicDisposableReferenceCounter.java index 8ca154488c6e1..ee5f281b6fc57 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/AtomicDisposableReferenceCounter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/AtomicDisposableReferenceCounter.java @@ -61,7 +61,17 @@ public boolean decrementReferenceCounter() { referenceCounter--; - if (referenceCounter == 0) { + if (referenceCounter <= 0) { + isDisposed = true; + } + + return isDisposed; + } + } + + public boolean disposeIfNotUsed() { + synchronized (lock) { + if(referenceCounter <= 0){ isDisposed = true; } diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/ActorLogMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/ActorLogMessages.scala index 15ce056ab926b..5d4f89ca82b4a 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/ActorLogMessages.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/ActorLogMessages.scala @@ -25,7 +25,7 @@ import _root_.akka.event.LoggingAdapter * Mixin to add debug message logging */ trait ActorLogMessages { - self: Actor => + that: Actor => override def receive: Receive = new Actor.Receive { private val _receiveWithLogMessages = receiveWithLogMessages @@ -37,14 +37,14 @@ trait ActorLogMessages { _receiveWithLogMessages(x) } else { - log.debug(s"Received message $x from ${self.sender}.") + log.debug(s"Received message $x at ${that.self.path} from ${that.sender}.") val start = System.nanoTime() _receiveWithLogMessages(x) val duration = (System.nanoTime() - start) / 1000000 - log.debug(s"Handled message $x in $duration ms from ${self.sender}.") + log.debug(s"Handled message $x in $duration ms from ${that.sender}.") } } } diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala index 7aa842b06bc83..014cf5791d706 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala @@ -146,7 +146,7 @@ object AkkaUtils { | serialize-messages = off | | loglevel = $logLevel - | stdout-loglevel = WARNING + | stdout-loglevel = OFF | | log-dead-letters = $logLifecycleEvents | log-dead-letters-during-shutdown = $logLifecycleEvents diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index 3265007a073ec..4e608faa3ab9a 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -94,7 +94,7 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka import context._ import taskManagerConfig.{timeout => tmTimeout, _} -import scala.collection.JavaConverters._ + import scala.collection.JavaConverters._ implicit val timeout = tmTimeout @@ -389,7 +389,7 @@ import scala.collection.JavaConverters._ manager.registerTask(jobID, executionID, tdd.getRequiredJarFiles) if (log.isDebugEnabled) { - log.debug("Register task {} took {}s", executionID, + log.debug("Register task {} at library cache manager took {}s", executionID, (System.currentTimeMillis() - startRegisteringTask) / 1000.0) } @@ -426,7 +426,11 @@ import scala.collection.JavaConverters._ // register the task with the network stack and profiles networkEnvironment match { - case Some(ne) => ne.registerTask(task) + case Some(ne) => + if (log.isDebugEnabled) { + log.debug("Register task {} on {}.", task, connectionInfo) + } + ne.registerTask(task) case None => throw new RuntimeException( "Network environment has not been properly instantiated.") } diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala index 147cc8a51f68c..368ac4df1f4ef 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala @@ -49,7 +49,7 @@ object TestingUtils { |akka.test.timefactor = 10 |akka.loggers = ["akka.event.slf4j.Slf4jLogger"] |akka.loglevel = $logLevel - |akka.stdout-loglevel = WARNING + |akka.stdout-loglevel = OFF |akka.jvm-exit-on-fata-error = off |akka.log-config-on-start = off """.stripMargin diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java index a365fbf59c98d..f5400cf88e4df 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java @@ -30,6 +30,7 @@ import org.apache.log4j.spi.LoggingEvent; import org.junit.Assert; import org.junit.BeforeClass; +import org.junit.Ignore; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,6 +44,7 @@ * This test starts a MiniYARNCluster with a FIFO scheudler. * There are no queues for that scheduler. */ +@Ignore("Because if fails :-(") public class YARNSessionFIFOITCase extends YarnTestBase { private static final Logger LOG = LoggerFactory.getLogger(YARNSessionFIFOITCase.class); From 65e22a417c29efda650440c53c3fa0f3767bd9b9 Mon Sep 17 00:00:00 2001 From: Ufuk Celebi Date: Tue, 24 Feb 2015 12:41:35 +0100 Subject: [PATCH 4/5] [FLINK-1604] [FLINK-1568] Add initial connect failure test --- .../runtime/executiongraph/Execution.java | 5 + .../executiongraph/ExecutionGraph.java | 4 + .../io/network/NetworkEnvironment.java | 3 +- .../runtime/taskmanager/TaskManager.scala | 7 +- .../PartitionRequestClientFactoryTest.java | 182 ++++++++++++++++++ .../YARNSessionCapacitySchedulerITCase.java | 2 + 6 files changed, 199 insertions(+), 4 deletions(-) create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index 56f9416a25543..9071a23aea301 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -802,6 +802,11 @@ private boolean transitionState(ExecutionState currentState, ExecutionState targ if (STATE_UPDATER.compareAndSet(this, currentState, targetState)) { markTimestamp(targetState); + if (LOG.isDebugEnabled()) { + LOG.debug("{} ({}) switched from {} to {}.",this.getVertex().getTaskName(), + getAttemptId(), currentState, targetState); + } + // make sure that the state transition completes normally. // potential errors (in listeners may not affect the main logic) try { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index f7b13fd01ca54..e6d9c85d88f66 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -422,6 +422,10 @@ private boolean transitionState(JobStatus current, JobStatus newState) { private boolean transitionState(JobStatus current, JobStatus newState, Throwable error) { if (STATE_UPDATER.compareAndSet(this, current, newState)) { + if (LOG.isDebugEnabled()) { + LOG.debug("{} switched from {} to {}.", this.getJobName(), current, newState); + } + stateTimestamps[newState.ordinal()] = System.currentTimeMillis(); notifyJobStatusChange(newState, error); return true; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java index 74a448ac3127c..94d726857c10f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java @@ -180,7 +180,8 @@ public void registerTask(Task task) throws IOException { public void unregisterTask(Task task) { if (LOG.isDebugEnabled()) { - LOG.debug("Unregistering task {} ({}) from network environment (state: {}).", task.getTaskNameWithSubtasks(), task.getExecutionId(), task.getExecutionState()); + LOG.debug("Unregistering task {} ({}) from network environment (state: {}).", + task.getTaskNameWithSubtasks(), task.getExecutionState()); } final ExecutionAttemptID executionId = task.getExecutionId(); diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index 4e608faa3ab9a..4af8df99cd982 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -542,14 +542,15 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka reader.updateInputChannel(partitionInfo) } catch { case t: Throwable => - log.error(t, "Task update failure. Trying to cancel task.") + log.error(t, "Could not update task {}. Trying to cancel task.", + task.getTaskName) try { - task.cancelExecution() + task.markFailed(t) } catch { case t: Throwable => log.error(t, "Failed canceling task with execution ID {} after task" + - "update failure..", executionId) + "update failure.", executionId) } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java new file mode 100644 index 0000000000000..2f45d6bec93f9 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelOutboundHandlerAdapter; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.ChannelPromise; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.io.network.RemoteAddress; +import org.junit.Test; + +import java.io.IOException; +import java.lang.Thread.UncaughtExceptionHandler; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class PartitionRequestClientFactoryTest { + + private final static int SERVER_PORT = 10021; + + @Test + public void testResourceReleaseAfterInterruptedConnect() throws Exception { + + // Latch to synchronize on the connect call. + final CountDownLatch syncOnConnect = new CountDownLatch(1); + + final Tuple2 netty = createNettyServerAndClient( + new NettyProtocol() { + @Override + public void setServerChannelPipeline(ChannelPipeline channelPipeline) { + } + + @Override + public void setClientChannelPipeline(ChannelPipeline channelPipeline) { + channelPipeline.addLast(new CountDownLatchOnConnectHandler(syncOnConnect)); + } + }); + + final NettyServer server = netty.f0; + final NettyClient client = netty.f1; + + final UncaughtTestExceptionHandler exceptionHandler = new UncaughtTestExceptionHandler(); + + try { + final PartitionRequestClientFactory factory = new PartitionRequestClientFactory(client); + + final Thread connect = new Thread(new Runnable() { + @Override + public void run() { + RemoteAddress serverAddress = null; + + try { + serverAddress = createServerRemoteAddress(0); + + // This triggers a connect + factory.createPartitionRequestClient(serverAddress); + } + catch (Throwable t) { + + if (serverAddress != null) { + factory.closeOpenChannelConnections(serverAddress); + Thread.getDefaultUncaughtExceptionHandler().uncaughtException(Thread.currentThread(), t); + } else { + t.printStackTrace(); + fail("Could not create RemoteAddress for server."); + } + } + } + }); + + connect.setUncaughtExceptionHandler(exceptionHandler); + + connect.start(); + + // Wait on the connect + syncOnConnect.await(); + + connect.interrupt(); + connect.join(); + + // Make sure that after a failed connect all resources are cleared. + assertEquals(0, factory.getNumberOfActiveClients()); + + // Make sure that the interrupt exception is not swallowed + assertTrue(exceptionHandler.getErrors().size() > 0); + } + finally { + if (server != null) { + server.shutdown(); + } + + if (client != null) { + client.shutdown(); + } + } + } + + private static class CountDownLatchOnConnectHandler extends ChannelOutboundHandlerAdapter { + + private final CountDownLatch syncOnConnect; + + public CountDownLatchOnConnectHandler(CountDownLatch syncOnConnect) { + this.syncOnConnect = syncOnConnect; + } + + @Override + public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception { + syncOnConnect.countDown(); + } + } + + private static class UncaughtTestExceptionHandler implements UncaughtExceptionHandler { + + private final List errors = new ArrayList(1); + + @Override + public void uncaughtException(Thread t, Throwable e) { + errors.add(e); + } + + private List getErrors() { + return errors; + } + } + + // ------------------------------------------------------------------------ + + private static Tuple2 createNettyServerAndClient(NettyProtocol protocol) throws IOException { + final NettyConfig config = new NettyConfig(InetAddress.getLocalHost(), SERVER_PORT, 32 * 1024, new Configuration()); + + final NettyServer server = new NettyServer(config); + final NettyClient client = new NettyClient(config); + + boolean success = false; + + try { + server.init(protocol); + client.init(protocol); + + success = true; + } + finally { + if (!success) { + server.shutdown(); + client.shutdown(); + } + } + + return new Tuple2(server, client); + } + + private static RemoteAddress createServerRemoteAddress(int connectionIndex) throws UnknownHostException { + return new RemoteAddress(new InetSocketAddress(InetAddress.getLocalHost(), SERVER_PORT), connectionIndex); + } +} diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java index e4f82cd6ad33b..86ca6085ab250 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java @@ -21,6 +21,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.junit.BeforeClass; +import org.junit.Ignore; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,6 +34,7 @@ * This test starts a MiniYARNCluster with a CapacityScheduler. * Is has, by default a queue called "default". The configuration here adds another queue: "qa-team". */ +@Ignore("Failing as well :-(") public class YARNSessionCapacitySchedulerITCase extends YarnTestBase { private static final Logger LOG = LoggerFactory.getLogger(YARNSessionCapacitySchedulerITCase.class); From 9c05f51a1bc3f5da5bd1bf101b10003afc1f8e5b Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Tue, 24 Feb 2015 15:58:25 +0100 Subject: [PATCH 5/5] [FLINK-1607] [runtime] Moves PartialPartitionInfo from ExecutionVertex to Execution to automatically clear them in case of restart This closes #436. --- .../flink/runtime/executiongraph/Execution.java | 13 ++++++++++--- .../runtime/executiongraph/ExecutionVertex.java | 13 +------------ .../yarn/YARNSessionCapacitySchedulerITCase.java | 2 -- .../apache/flink/yarn/YARNSessionFIFOITCase.java | 2 -- tools/log4j-travis.properties | 2 +- 5 files changed, 12 insertions(+), 20 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index 9071a23aea301..dc9d95ca11dc9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -111,6 +111,7 @@ public class Execution implements Serializable { private final FiniteDuration timeout; + private ConcurrentLinkedQueue partialPartitionInfos; private volatile ExecutionState state = CREATED; @@ -134,6 +135,8 @@ public Execution(ExecutionVertex vertex, int attemptNumber, long startTimestamp, markTimestamp(ExecutionState.CREATED, startTimestamp); this.timeout = timeout; + + this.partialPartitionInfos = new ConcurrentLinkedQueue(); } // -------------------------------------------------------------------------------------------- @@ -188,6 +191,9 @@ public void prepareForArchiving() { throw new IllegalStateException("Cannot archive Execution while the assigned resource is still running."); } assignedResource = null; + + partialPartitionInfos.clear(); + partialPartitionInfos = null; } // -------------------------------------------------------------------------------------------- @@ -595,10 +601,11 @@ else if (current == CANCELING || current == RUNNING || current == DEPLOYING) { } } - void sendPartitionInfos() { - ConcurrentLinkedQueue partialPartitionInfos = - vertex.getPartialPartitionInfos(); + void cachePartitionInfo(PartialPartitionInfo partitionInfo) { + partialPartitionInfos.add(partitionInfo); + } + void sendPartitionInfos() { // check if the ExecutionVertex has already been archived and thus cleared the // partial partition infos queue if(partialPartitionInfos != null && !partialPartitionInfos.isEmpty()) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java index 8d2e7213a8a5a..97143b17a1430 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java @@ -47,7 +47,6 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CopyOnWriteArrayList; import static com.google.common.base.Preconditions.checkElementIndex; @@ -75,8 +74,6 @@ public class ExecutionVertex implements Serializable { private ExecutionEdge[][] inputEdges; - private ConcurrentLinkedQueue partialPartitionInfos; - private final int subTaskIndex; private final List priorExecutions; @@ -114,8 +111,6 @@ public ExecutionVertex(ExecutionJobVertex jobVertex, int subTaskIndex, this.inputEdges = new ExecutionEdge[jobVertex.getJobVertex().getInputs().size()][]; - this.partialPartitionInfos = new ConcurrentLinkedQueue(); - this.priorExecutions = new CopyOnWriteArrayList(); this.currentExecution = new Execution(this, 0, createTimestamp, timeout); @@ -204,10 +199,6 @@ public ExecutionGraph getExecutionGraph() { return this.jobVertex.getGraph(); } - public ConcurrentLinkedQueue getPartialPartitionInfos() { - return partialPartitionInfos; - } - // -------------------------------------------------------------------------------------------- // Graph building // -------------------------------------------------------------------------------------------- @@ -451,12 +442,10 @@ public void prepareForArchiving() throws IllegalStateException { this.resultPartitions = null; this.inputEdges = null; this.locationConstraintInstances = null; - this.partialPartitionInfos.clear(); - this.partialPartitionInfos = null; } public void cachePartitionInfo(PartialPartitionInfo partitionInfo){ - this.partialPartitionInfos.add(partitionInfo); + getCurrentExecutionAttempt().cachePartitionInfo(partitionInfo); } void sendPartitionInfos() { diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java index 86ca6085ab250..e4f82cd6ad33b 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java @@ -21,7 +21,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.junit.BeforeClass; -import org.junit.Ignore; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,7 +33,6 @@ * This test starts a MiniYARNCluster with a CapacityScheduler. * Is has, by default a queue called "default". The configuration here adds another queue: "qa-team". */ -@Ignore("Failing as well :-(") public class YARNSessionCapacitySchedulerITCase extends YarnTestBase { private static final Logger LOG = LoggerFactory.getLogger(YARNSessionCapacitySchedulerITCase.class); diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java index f5400cf88e4df..a365fbf59c98d 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java @@ -30,7 +30,6 @@ import org.apache.log4j.spi.LoggingEvent; import org.junit.Assert; import org.junit.BeforeClass; -import org.junit.Ignore; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,7 +43,6 @@ * This test starts a MiniYARNCluster with a FIFO scheudler. * There are no queues for that scheduler. */ -@Ignore("Because if fails :-(") public class YARNSessionFIFOITCase extends YarnTestBase { private static final Logger LOG = LoggerFactory.getLogger(YARNSessionFIFOITCase.class); diff --git a/tools/log4j-travis.properties b/tools/log4j-travis.properties index d92fcba692a4a..f69a6736f8ffb 100644 --- a/tools/log4j-travis.properties +++ b/tools/log4j-travis.properties @@ -16,7 +16,7 @@ # limitations under the License. ################################################################################ -log4j.rootLogger=DEBUG, file +log4j.rootLogger=INFO, file # ----------------------------------------------------------------------------- # Console (use 'console')