From dc4d2560d7cc81edac009c73f4dd5e0b0b0f2865 Mon Sep 17 00:00:00 2001 From: harker2015 Date: Mon, 19 Sep 2022 11:03:25 +0200 Subject: [PATCH] [FLINK-29324] Add unit test case --- .../connectors/kinesis/FlinkKinesisConsumerTest.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java index deef7b38057aa..51367a6a110ee 100644 --- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java @@ -1184,6 +1184,14 @@ public void emitWatermark(Watermark mark) { testHarness.close(); } + @Test + public void testCloseConnectorBeforeSubtaskStart() throws Exception { + Properties config = TestUtils.getStandardProperties(); + FlinkKinesisConsumer consumer = + new FlinkKinesisConsumer<>("fakeStream", new SimpleStringSchema(), config); + consumer.close(); + } + private void awaitRecordCount(ConcurrentLinkedQueue queue, int count) throws Exception { Deadline deadline = Deadline.fromNow(Duration.ofSeconds(10));