diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java index b0a729fb11eb5..488a1f54e85c4 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java @@ -418,7 +418,12 @@ public void cancel() { public void close() throws Exception { cancel(); // safe-guard when the fetcher has been interrupted, make sure to not leak resources - fetcher.awaitTermination(); + // application might be stopped before connector subtask has been started + // so we must check if the fetcher is actually created + KinesisDataFetcher fetcher = this.fetcher; + if (fetcher != null) { + fetcher.awaitTermination(); + } this.fetcher = null; super.close(); } diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java index d48e04ed4e69a..6ad94f823e82e 100644 --- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java @@ -1194,6 +1194,14 @@ public void emitWatermark(Watermark mark) { testHarness.close(); } + @Test + public void testCloseConnectorBeforeSubtaskStart() throws Exception { + Properties config = TestUtils.getStandardProperties(); + FlinkKinesisConsumer consumer = + new FlinkKinesisConsumer<>("fakeStream", new SimpleStringSchema(), config); + consumer.close(); + } + private void awaitRecordCount(ConcurrentLinkedQueue queue, int count) throws Exception { Deadline deadline = Deadline.fromNow(Duration.ofSeconds(10));