From dba969aea20051e56436f4791aa81348f3167d9a Mon Sep 17 00:00:00 2001 From: harker2015 Date: Mon, 19 Sep 2022 10:40:10 +0200 Subject: [PATCH 1/2] [FLINK-29324] Fix NPE for Kinesis connector when closing --- .../streaming/connectors/kinesis/FlinkKinesisConsumer.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java index b0a729fb11eb5..488a1f54e85c4 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java @@ -418,7 +418,12 @@ public void cancel() { public void close() throws Exception { cancel(); // safe-guard when the fetcher has been interrupted, make sure to not leak resources - fetcher.awaitTermination(); + // application might be stopped before connector subtask has been started + // so we must check if the fetcher is actually created + KinesisDataFetcher fetcher = this.fetcher; + if (fetcher != null) { + fetcher.awaitTermination(); + } this.fetcher = null; super.close(); } From dc4d2560d7cc81edac009c73f4dd5e0b0b0f2865 Mon Sep 17 00:00:00 2001 From: harker2015 Date: Mon, 19 Sep 2022 11:03:25 +0200 Subject: [PATCH 2/2] [FLINK-29324] Add unit test case --- .../connectors/kinesis/FlinkKinesisConsumerTest.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java index deef7b38057aa..51367a6a110ee 100644 --- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java @@ -1184,6 +1184,14 @@ public void emitWatermark(Watermark mark) { testHarness.close(); } + @Test + public void testCloseConnectorBeforeSubtaskStart() throws Exception { + Properties config = TestUtils.getStandardProperties(); + FlinkKinesisConsumer consumer = + new FlinkKinesisConsumer<>("fakeStream", new SimpleStringSchema(), config); + consumer.close(); + } + private void awaitRecordCount(ConcurrentLinkedQueue queue, int count) throws Exception { Deadline deadline = Deadline.fromNow(Duration.ofSeconds(10));