From 71fea9a4522505a6c0f23f1de599b7f87a633ccf Mon Sep 17 00:00:00 2001 From: harker2015 Date: Tue, 20 Sep 2022 17:31:44 +0200 Subject: [PATCH] [FLINK-29324][Connectors/Kinesis] Fix NPE for Kinesis connector when closing (#20853) * [FLINK-29324] Fix NPE for Kinesis connector when closing * [FLINK-29324] Add unit test case --- .../connectors/kinesis/FlinkKinesisConsumer.java | 7 ++++++- .../connectors/kinesis/FlinkKinesisConsumerTest.java | 8 ++++++++ 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java index b0a729fb11eb5..488a1f54e85c4 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java @@ -418,7 +418,12 @@ public void cancel() { public void close() throws Exception { cancel(); // safe-guard when the fetcher has been interrupted, make sure to not leak resources - fetcher.awaitTermination(); + // application might be stopped before connector subtask has been started + // so we must check if the fetcher is actually created + KinesisDataFetcher fetcher = this.fetcher; + if (fetcher != null) { + fetcher.awaitTermination(); + } this.fetcher = null; super.close(); } diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java index deef7b38057aa..51367a6a110ee 100644 --- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java @@ -1184,6 +1184,14 @@ public void emitWatermark(Watermark mark) { testHarness.close(); } + @Test + public void testCloseConnectorBeforeSubtaskStart() throws Exception { + Properties config = TestUtils.getStandardProperties(); + FlinkKinesisConsumer consumer = + new FlinkKinesisConsumer<>("fakeStream", new SimpleStringSchema(), config); + consumer.close(); + } + private void awaitRecordCount(ConcurrentLinkedQueue queue, int count) throws Exception { Deadline deadline = Deadline.fromNow(Duration.ofSeconds(10));