From 31f672171bd44f86c4bd31bd383077141840e2a6 Mon Sep 17 00:00:00 2001 From: Nico Kruber Date: Thu, 27 Jul 2017 18:31:13 +0200 Subject: [PATCH 1/4] [FLINK-7287][tests] fix checks ignoring a JobCancellationException --- .../streaming/connectors/kafka/KafkaConsumerTestBase.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java index dac45f7cf9f4e..0fbfefa963404 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java @@ -217,7 +217,7 @@ public void run() { env.execute(); } catch (Throwable t) { - if (!(t.getCause() instanceof JobCancellationException)) { + if (!(t instanceof JobCancellationException)) { errorRef.set(t); } } @@ -303,7 +303,7 @@ public void run() { env.execute(); } catch (Throwable t) { - if (!(t.getCause() instanceof JobCancellationException)) { + if (!(t instanceof JobCancellationException)) { errorRef.set(t); } } From db4ec9f1237e704b761404b004a25df8f924e546 Mon Sep 17 00:00:00 2001 From: Nico Kruber Date: Thu, 27 Jul 2017 18:32:14 +0200 Subject: [PATCH 2/4] [FLINK-7287][tests] fix main test threads not waiting for the job execution thread to finish --- .../connectors/kafka/KafkaConsumerTestBase.java | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java index 0fbfefa963404..27d97d22d98cf 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java @@ -243,8 +243,9 @@ public void run() { } while (System.nanoTime() < deadline); - // cancel the job + // cancel the job & wait for the job to finish JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout)); + runner.join(); final Throwable t = errorRef.get(); if (t != null) { @@ -328,8 +329,9 @@ public void run() { } while (System.nanoTime() < deadline); - // cancel the job + // cancel the job & wait for the job to finish JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout)); + runner.join(); final Throwable t = errorRef.get(); if (t != null) { @@ -1660,9 +1662,9 @@ public void cancel() { JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout)); } - while (jobThread.isAlive()) { - Thread.sleep(50); - } + // wait for the job to finish (it should due to the cancel command above) + jobThread.join(); + if (error.f0 != null) { throw error.f0; } From acd29f357ae8d9a9d81aa8cf956c72cd569ceabb Mon Sep 17 00:00:00 2001 From: Nico Kruber Date: Fri, 28 Jul 2017 10:29:53 +0200 Subject: [PATCH 3/4] [FLINK-7287][tests] reduce the debug output in case of expected exceptions --- .../flink/streaming/connectors/kafka/KafkaConsumerTestBase.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java index 27d97d22d98cf..049c1f627922c 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java @@ -1609,8 +1609,8 @@ public void cancel() { env1.execute("Metrics test job"); } catch (Throwable t) { - LOG.warn("Got exception during execution", t); if (!(t instanceof JobCancellationException)) { // we'll cancel the job + LOG.warn("Got exception during execution", t); error.f0 = t; } } From 3762403836dfd2993e65a4f58658bd8f78ea1993 Mon Sep 17 00:00:00 2001 From: Nico Kruber Date: Fri, 28 Jul 2017 10:30:17 +0200 Subject: [PATCH 4/4] [FLINK-7287][tests] always wait for the cancelled job to finish --- .../streaming/connectors/kafka/KafkaConsumerTestBase.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java index 049c1f627922c..fda68323f51bd 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java @@ -1660,11 +1660,10 @@ public void cancel() { } finally { // cancel JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout)); + // wait for the job to finish (it should due to the cancel command above) + jobThread.join(); } - // wait for the job to finish (it should due to the cancel command above) - jobThread.join(); - if (error.f0 != null) { throw error.f0; }