Skip to content

Commit

Permalink
[FLINK-29324][Connectors/Kinesis] Fix NPE for Kinesis connector when …
Browse files Browse the repository at this point in the history
…closing (#20853)

* [FLINK-29324] Fix NPE for Kinesis connector when closing

* [FLINK-29324] Add unit test case
  • Loading branch information
harker2015 authored and dannycranmer committed Sep 20, 2022
1 parent 7784ea7 commit eb65655
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 1 deletion.
Expand Up @@ -418,7 +418,12 @@ public void cancel() {
public void close() throws Exception {
cancel();
// safe-guard when the fetcher has been interrupted, make sure to not leak resources
fetcher.awaitTermination();
// application might be stopped before connector subtask has been started
// so we must check if the fetcher is actually created
KinesisDataFetcher fetcher = this.fetcher;
if (fetcher != null) {
fetcher.awaitTermination();
}
this.fetcher = null;
super.close();
}
Expand Down
Expand Up @@ -1194,6 +1194,14 @@ public void emitWatermark(Watermark mark) {
testHarness.close();
}

@Test
public void testCloseConnectorBeforeSubtaskStart() throws Exception {
Properties config = TestUtils.getStandardProperties();
FlinkKinesisConsumer<String> consumer =
new FlinkKinesisConsumer<>("fakeStream", new SimpleStringSchema(), config);
consumer.close();
}

private void awaitRecordCount(ConcurrentLinkedQueue<? extends Object> queue, int count)
throws Exception {
Deadline deadline = Deadline.fromNow(Duration.ofSeconds(10));
Expand Down

0 comments on commit eb65655

Please sign in to comment.