diff --git a/src/main/java/org/apache/flink/connector/rocketmq/legacy/RocketMQSourceFunction.java b/src/main/java/org/apache/flink/connector/rocketmq/legacy/RocketMQSourceFunction.java index e445643..bedf97f 100644 --- a/src/main/java/org/apache/flink/connector/rocketmq/legacy/RocketMQSourceFunction.java +++ b/src/main/java/org/apache/flink/connector/rocketmq/legacy/RocketMQSourceFunction.java @@ -62,6 +62,7 @@ import java.lang.management.ManagementFactory; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -536,7 +537,7 @@ public void close() throws Exception { } } - public void initOffsetTableFromRestoredOffsets(List messageQueues) { + public void initOffsetTableFromRestoredOffsets(List messageQueues) throws MQClientException { Preconditions.checkNotNull(restoredOffsets, "restoredOffsets can't be null"); restoredOffsets.forEach( (mq, offset) -> { @@ -544,6 +545,17 @@ public void initOffsetTableFromRestoredOffsets(List messageQueues) offsetTable.put(mq, offset); } }); + + List extMessageQueue = new ArrayList<>(); + for (MessageQueue messageQueue : messageQueues) { + if (!offsetTable.containsKey(messageQueue)) { + extMessageQueue.add(messageQueue); + } + } + if (extMessageQueue.size() != 0) { + log.info("no restoredOffsets for {}, so init offset for these queues", extMessageQueue); + initOffsets(extMessageQueue); + } log.info("init offset table [{}] from restoredOffsets successful.", offsetTable); } diff --git a/src/test/java/org/apache/flink/connector/rocketmq/legacy/sourceFunction/RocketMQSourceFunctionTest.java b/src/test/java/org/apache/flink/connector/rocketmq/legacy/sourceFunction/RocketMQSourceFunctionTest.java index cd514cd..08371b3 100644 --- a/src/test/java/org/apache/flink/connector/rocketmq/legacy/sourceFunction/RocketMQSourceFunctionTest.java +++ b/src/test/java/org/apache/flink/connector/rocketmq/legacy/sourceFunction/RocketMQSourceFunctionTest.java @@ -25,12 +25,16 @@ import org.apache.flink.connector.rocketmq.legacy.common.serialization.SimpleStringDeserializationSchema; import org.apache.flink.connector.rocketmq.legacy.common.util.TestUtils; +import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer; +import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; import org.apache.rocketmq.common.message.MessageQueue; import org.junit.Assert; import org.junit.Test; +import org.mockito.Mockito; import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; @@ -69,6 +73,8 @@ public void testSetStartupMode() throws NoSuchFieldException, IllegalAccessExcep @Test public void testRestartFromCheckpoint() throws Exception { + DefaultLitePullConsumer consumer = Mockito.mock(DefaultLitePullConsumer.class); + Mockito.when(consumer.committed(Mockito.any())).thenReturn(40L); Properties properties = new Properties(); properties.setProperty(RocketMQConfig.CONSUMER_GROUP, "${ConsumerGroup}"); properties.setProperty(RocketMQConfig.CONSUMER_TOPIC, "${SourceTopic}"); @@ -82,13 +88,19 @@ public void testRestartFromCheckpoint() throws Exception { map.put(new MessageQueue("tpc", "broker-0", 1), 21L); map.put(new MessageQueue("tpc", "broker-1", 0), 30L); map.put(new MessageQueue("tpc", "broker-1", 1), 31L); + List allocateMessageQueues = new ArrayList<>(map.keySet()); + MessageQueue newMessageQueue = new MessageQueue("tpc", "broker-2", 0); + allocateMessageQueues.add(newMessageQueue); + TestUtils.setFieldValue(source, "messageQueues", allocateMessageQueues); + TestUtils.setFieldValue(source, "consumer", consumer); TestUtils.setFieldValue(source, "restoredOffsets", map); TestUtils.setFieldValue(source, "offsetTable", new ConcurrentHashMap<>()); - source.initOffsetTableFromRestoredOffsets(new ArrayList<>(map.keySet())); + source.initOffsetTableFromRestoredOffsets(allocateMessageQueues); Map offsetTable = (Map) TestUtils.getFieldValue(source, "offsetTable"); for (Map.Entry entry : map.entrySet()) { assertEquals(offsetTable.containsKey(entry.getKey()), true); assertEquals(offsetTable.containsValue(entry.getValue()), true); } + assertEquals(offsetTable.containsKey(newMessageQueue), true); } }