diff --git a/example/session/src/main/java/org/apache/iotdb/SubscriptionSessionExample.java b/example/session/src/main/java/org/apache/iotdb/SubscriptionSessionExample.java index f31eac1df6c71..0c674f41b796d 100644 --- a/example/session/src/main/java/org/apache/iotdb/SubscriptionSessionExample.java +++ b/example/session/src/main/java/org/apache/iotdb/SubscriptionSessionExample.java @@ -30,6 +30,7 @@ import org.apache.iotdb.session.subscription.SubscriptionSessionDataSets; import java.time.Duration; +import java.util.Collections; import java.util.List; import java.util.Properties; @@ -63,10 +64,12 @@ public static void main(String[] args) throws Exception { session.executeNonQueryStatement("flush"); // Create topic + final String topic1 = "topic1"; + final String topic2 = "`topic2`"; try (SubscriptionSession subscriptionSession = new SubscriptionSession(LOCAL_HOST, 6667)) { subscriptionSession.open(); - subscriptionSession.createTopic("topic1"); - subscriptionSession.createTopic("topic2"); + subscriptionSession.createTopic(topic1); + subscriptionSession.createTopic(topic2); } // Subscription: property-style ctor @@ -75,7 +78,7 @@ public static void main(String[] args) throws Exception { config.put(ConsumerConstant.CONSUMER_GROUP_ID_KEY, "cg1"); SubscriptionPullConsumer consumer1 = new SubscriptionPullConsumer(config); consumer1.open(); - consumer1.subscribe("topic1"); + consumer1.subscribe(topic1); while (true) { Thread.sleep(1000); // Wait for some time List messages = consumer1.poll(Duration.ofMillis(10000)); @@ -102,7 +105,7 @@ public static void main(String[] args) throws Exception { subscriptionSession.getSubscriptions().forEach((System.out::println)); } - consumer1.unsubscribe("topic1"); + consumer1.unsubscribe(topic1); consumer1.close(); // Subscription: builder-style ctor @@ -113,10 +116,11 @@ public static void main(String[] args) throws Exception { .autoCommit(false) .buildPullConsumer()) { consumer2.open(); - consumer2.subscribe("topic2"); + consumer2.subscribe(topic2); while (true) { Thread.sleep(1000); // wait some time - List messages = consumer2.poll(Duration.ofMillis(10000)); + List messages = + consumer2.poll(Collections.singleton(topic2), Duration.ofMillis(10000)); if (messages.isEmpty()) { break; } @@ -132,7 +136,7 @@ public static void main(String[] args) throws Exception { } consumer2.commitSync(messages); } - consumer2.unsubscribe("topic2"); + consumer2.unsubscribe(topic2); } // Query diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java index e8cc96c3f1248..42fb88adaf49f 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java @@ -78,13 +78,14 @@ public void testTopicPathSubscription() throws Exception { } // Create topic on sender + final String topicName = "topic1"; final String host = senderEnv.getIP(); final int port = Integer.parseInt(senderEnv.getPort()); try (final SubscriptionSession session = new SubscriptionSession(host, port)) { session.open(); final Properties config = new Properties(); config.put(TopicConstant.PATH_KEY, "root.db.*.s"); - session.createTopic("topic1", config); + session.createTopic(topicName, config); } catch (final Exception e) { e.printStackTrace(); fail(e.getMessage()); @@ -105,7 +106,7 @@ public void testTopicPathSubscription() throws Exception { .buildPullConsumer(); final ISession session = receiverEnv.getSessionConnection()) { consumer.open(); - consumer.subscribe("topic1"); + consumer.subscribe(topicName); while (!isClosed.get()) { try { Thread.sleep(1000); // wait some time @@ -127,7 +128,7 @@ public void testTopicPathSubscription() throws Exception { } consumer.commitSync(messages); } - consumer.unsubscribe("topic1"); + consumer.unsubscribe(topicName); } catch (final Exception e) { e.printStackTrace(); // Avoid fail @@ -184,13 +185,14 @@ public void testTopicTimeSubscription() throws Exception { } // Create topic on sender + final String topicName = "topic2"; final String host = senderEnv.getIP(); final int port = Integer.parseInt(senderEnv.getPort()); try (final SubscriptionSession session = new SubscriptionSession(host, port)) { session.open(); final Properties config = new Properties(); config.put(TopicConstant.START_TIME_KEY, currentTime); - session.createTopic("topic1", config); + session.createTopic(topicName, config); } catch (final Exception e) { e.printStackTrace(); fail(e.getMessage()); @@ -211,7 +213,7 @@ public void testTopicTimeSubscription() throws Exception { .buildPullConsumer(); final ISession session = receiverEnv.getSessionConnection()) { consumer.open(); - consumer.subscribe("topic1"); + consumer.subscribe(topicName); while (!isClosed.get()) { try { Thread.sleep(1000); // wait some time @@ -233,7 +235,7 @@ public void testTopicTimeSubscription() throws Exception { } consumer.commitSync(messages); } - consumer.unsubscribe("topic1"); + consumer.unsubscribe(topicName); } catch (final Exception e) { e.printStackTrace(); // Avoid failure @@ -284,6 +286,7 @@ public void testTopicProcessorSubscription() throws Exception { } // Create topic + final String topicName = "topic3"; final String host = senderEnv.getIP(); final int port = Integer.parseInt(senderEnv.getPort()); try (final SubscriptionSession session = new SubscriptionSession(host, port)) { @@ -292,7 +295,7 @@ public void testTopicProcessorSubscription() throws Exception { config.put("processor", "tumbling-time-sampling-processor"); config.put("processor.tumbling-time.interval-seconds", "1"); config.put("processor.down-sampling.split-file", "true"); - session.createTopic("topic1", config); + session.createTopic(topicName, config); } catch (final Exception e) { e.printStackTrace(); fail(e.getMessage()); @@ -313,7 +316,7 @@ public void testTopicProcessorSubscription() throws Exception { .buildPullConsumer(); final ISession session = receiverEnv.getSessionConnection()) { consumer.open(); - consumer.subscribe("topic1"); + consumer.subscribe(topicName); while (!isClosed.get()) { try { Thread.sleep(1000); // wait some time @@ -335,7 +338,7 @@ public void testTopicProcessorSubscription() throws Exception { } consumer.commitSync(messages); } - consumer.unsubscribe("topic1"); + consumer.unsubscribe(topicName); } catch (final Exception e) { e.printStackTrace(); // Avoid failure @@ -373,4 +376,136 @@ public void testTopicProcessorSubscription() throws Exception { thread.join(); } } + + @Test + public void testTopicNameWithBackQuote() throws Exception { + // Insert some historical data on sender + try (final ISession session = senderEnv.getSessionConnection()) { + for (int i = 0; i < 100; ++i) { + session.executeNonQueryStatement( + String.format("insert into root.db.d1(time, s) values (%s, 1)", i)); + } + for (int i = 100; i < 200; ++i) { + session.executeNonQueryStatement( + String.format("insert into root.db.d1(time, s) values (%s, 1)", i)); + } + for (int i = 200; i < 300; ++i) { + session.executeNonQueryStatement( + String.format("insert into root.db.d1(time, s) values (%s, 1)", i)); + } + session.executeNonQueryStatement("flush"); + } catch (final Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + + // Create topic on sender + final String topic1 = "`topic1`"; + final String topic2 = "`'topic2'`"; + final String topic3 = "`\"topic3\"`"; + final String host = senderEnv.getIP(); + final int port = Integer.parseInt(senderEnv.getPort()); + try (final SubscriptionSession session = new SubscriptionSession(host, port)) { + session.open(); + { + final Properties config = new Properties(); + config.put(TopicConstant.START_TIME_KEY, 0); + config.put(TopicConstant.END_TIME_KEY, 99); + session.createTopic(topic1, config); + } + { + final Properties config = new Properties(); + config.put(TopicConstant.START_TIME_KEY, 100); + config.put(TopicConstant.END_TIME_KEY, 199); + session.createTopic(topic2, config); + } + { + final Properties config = new Properties(); + config.put(TopicConstant.START_TIME_KEY, 200); + config.put(TopicConstant.END_TIME_KEY, 299); + session.createTopic(topic3, config); + } + } catch (final Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + + // Subscribe on sender and insert on receiver + final Set topics = new HashSet<>(); + topics.add(topic1); + topics.add(topic2); + topics.add(topic3); + final AtomicBoolean isClosed = new AtomicBoolean(false); + final Thread thread = + new Thread( + () -> { + try (final SubscriptionPullConsumer consumer = + new SubscriptionPullConsumer.Builder() + .host(host) + .port(port) + .consumerId("c1") + .consumerGroupId("cg1") + .autoCommit(false) + .buildPullConsumer(); + final ISession session = receiverEnv.getSessionConnection()) { + consumer.open(); + consumer.subscribe(topics); + while (!isClosed.get()) { + try { + Thread.sleep(1000); // wait some time + } catch (final InterruptedException e) { + break; + } + final List messages = + consumer.poll(topics, Duration.ofMillis(10000)); + if (messages.isEmpty()) { + continue; + } + for (final SubscriptionMessage message : messages) { + final SubscriptionSessionDataSets payload = + (SubscriptionSessionDataSets) message.getPayload(); + for (final Iterator it = payload.tabletIterator(); it.hasNext(); ) { + final Tablet tablet = it.next(); + session.insertTablet(tablet); + } + } + consumer.commitSync(messages); + } + consumer.unsubscribe(topics); + } catch (final Exception e) { + e.printStackTrace(); + // Avoid failure + } finally { + LOGGER.info("consumer exiting..."); + } + }); + thread.start(); + + // Check data on receiver + try { + try (final Connection connection = receiverEnv.getConnection(); + final Statement statement = connection.createStatement()) { + // Keep retrying if there are execution failures + Awaitility.await() + .pollDelay(1, TimeUnit.SECONDS) + .pollInterval(1, TimeUnit.SECONDS) + .atMost(120, TimeUnit.SECONDS) + .untilAsserted( + () -> + TestUtils.assertSingleResultSetEqual( + TestUtils.executeQueryWithRetry(statement, "select count(*) from root.**"), + new HashMap() { + { + put("count(root.db.d1.s)", "300"); + } + })); + } + } catch (final Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } finally { + isClosed.set(true); + thread.join(); + } + } } diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java index 93a02c8b836f9..6938763c8f82d 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java @@ -78,11 +78,12 @@ public void testBasicSubscription() throws Exception { } // Create topic + final String topicName = "topic1"; final String host = EnvFactory.getEnv().getIP(); final int port = Integer.parseInt(EnvFactory.getEnv().getPort()); try (final SubscriptionSession session = new SubscriptionSession(host, port)) { session.open(); - session.createTopic("topic1"); + session.createTopic(topicName); } catch (final Exception e) { e.printStackTrace(); fail(e.getMessage()); @@ -103,7 +104,7 @@ public void testBasicSubscription() throws Exception { .autoCommit(false) .buildPullConsumer()) { consumer.open(); - consumer.subscribe("topic1"); + consumer.subscribe(topicName); while (!isClosed.get()) { try { Thread.sleep(1000); // wait some time @@ -127,7 +128,7 @@ public void testBasicSubscription() throws Exception { } consumer.commitSync(messages); } - consumer.unsubscribe("topic1"); + consumer.unsubscribe(topicName); } catch (final Exception e) { e.printStackTrace(); // Avoid failure diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionIdempotentIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionIdempotentIT.java index ff5a86bc2206f..f97d642b384ed 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionIdempotentIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionIdempotentIT.java @@ -58,6 +58,7 @@ public void testSubscribeOrUnsubscribeNonExistedTopicTest() { final int port = Integer.parseInt(EnvFactory.getEnv().getPort()); // Subscribe non-existed topic + final String topicName = "topic1"; try (final SubscriptionPullConsumer consumer = new SubscriptionPullConsumer.Builder() .host(host) @@ -67,7 +68,7 @@ public void testSubscribeOrUnsubscribeNonExistedTopicTest() { .autoCommit(false) .buildPullConsumer()) { consumer.open(); - consumer.subscribe("topic1"); + consumer.subscribe(topicName); fail(); } catch (final Exception ignored) { } finally { @@ -84,7 +85,7 @@ public void testSubscribeOrUnsubscribeNonExistedTopicTest() { .autoCommit(false) .buildPullConsumer()) { consumer.open(); - consumer.unsubscribe("topic1"); + consumer.unsubscribe(topicName); fail(); } catch (final Exception ignored) { } finally { @@ -98,9 +99,10 @@ public void testSubscribeExistedSubscribedTopicTest() { final int port = Integer.parseInt(EnvFactory.getEnv().getPort()); // Create topic + final String topicName = "topic2"; try (final SubscriptionSession session = new SubscriptionSession(host, port)) { session.open(); - session.createTopic("topic1"); + session.createTopic(topicName); } catch (final Exception e) { e.printStackTrace(); Assert.fail(e.getMessage()); @@ -115,9 +117,9 @@ public void testSubscribeExistedSubscribedTopicTest() { .autoCommit(false) .buildPullConsumer()) { consumer.open(); - consumer.subscribe("topic1"); + consumer.subscribe(topicName); // Subscribe existed subscribed topic - consumer.subscribe("topic1"); + consumer.subscribe(topicName); } catch (final Exception e) { e.printStackTrace(); Assert.fail(e.getMessage()); @@ -132,9 +134,10 @@ public void testUnsubscribeExistedNonSubscribedTopicTest() { final int port = Integer.parseInt(EnvFactory.getEnv().getPort()); // Create topic + final String topicName = "topic3"; try (final SubscriptionSession session = new SubscriptionSession(host, port)) { session.open(); - session.createTopic("topic1"); + session.createTopic(topicName); } catch (final Exception e) { e.printStackTrace(); Assert.fail(e.getMessage()); @@ -150,7 +153,7 @@ public void testUnsubscribeExistedNonSubscribedTopicTest() { .buildPullConsumer()) { consumer.open(); // Unsubscribe existed non-subscribed topic - consumer.unsubscribe("topic1"); + consumer.unsubscribe(topicName); } catch (final Exception e) { e.printStackTrace(); Assert.fail(e.getMessage()); diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionRestartIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionRestartIT.java index 9e33a349e2723..c0ff9d67466f1 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionRestartIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionRestartIT.java @@ -88,9 +88,10 @@ public void testSubscriptionAfterRestartCluster() throws Exception { final int port = Integer.parseInt(EnvFactory.getEnv().getPort()); // Create topic + final String topicName = "topic1"; try (final SubscriptionSession session = new SubscriptionSession(host, port)) { session.open(); - session.createTopic("topic1"); + session.createTopic(topicName); } catch (final Exception e) { e.printStackTrace(); fail(e.getMessage()); @@ -107,7 +108,7 @@ public void testSubscriptionAfterRestartCluster() throws Exception { .autoCommit(false) .buildPullConsumer(); consumer.open(); - consumer.subscribe("topic1"); + consumer.subscribe(topicName); } catch (final Exception e) { e.printStackTrace(); fail(e.getMessage()); @@ -181,7 +182,7 @@ public void testSubscriptionAfterRestartCluster() throws Exception { } consumer.commitSync(messages); } - consumer.unsubscribe("topic1"); + consumer.unsubscribe(topicName); } catch (final Exception e) { e.printStackTrace(); // Avoid failure @@ -215,9 +216,10 @@ public void testSubscriptionAfterRestartDataNode() throws Exception { final int port = Integer.parseInt(EnvFactory.getEnv().getPort()); // Create topic + final String topicName = "topic2"; try (final SubscriptionSession session = new SubscriptionSession(host, port)) { session.open(); - session.createTopic("topic1"); + session.createTopic(topicName); } catch (final Exception e) { e.printStackTrace(); fail(e.getMessage()); @@ -237,7 +239,7 @@ public void testSubscriptionAfterRestartDataNode() throws Exception { .endpointsSyncIntervalMs(5000) // narrow endpoints sync interval .buildPullConsumer(); consumer.open(); - consumer.subscribe("topic1"); + consumer.subscribe(topicName); } catch (final Exception e) { e.printStackTrace(); fail(e.getMessage()); @@ -294,7 +296,7 @@ public void testSubscriptionAfterRestartDataNode() throws Exception { // Auto commit } } - consumerRef.unsubscribe("topic1"); + consumerRef.unsubscribe(topicName); } catch (final Exception e) { e.printStackTrace(); // Avoid failure @@ -346,9 +348,10 @@ public void testSubscriptionWhenConfigNodeLeaderChange() throws Exception { final int port = Integer.parseInt(EnvFactory.getEnv().getPort()); // Create topic + final String topicName = "topic3"; try (final SubscriptionSession session = new SubscriptionSession(host, port)) { session.open(); - session.createTopic("topic1"); + session.createTopic(topicName); } catch (final Exception e) { e.printStackTrace(); fail(e.getMessage()); @@ -368,7 +371,7 @@ public void testSubscriptionWhenConfigNodeLeaderChange() throws Exception { .endpointsSyncIntervalMs(5000) // narrow endpoints sync interval .buildPullConsumer(); consumer.open(); - consumer.subscribe("topic1"); + consumer.subscribe(topicName); } catch (final Exception e) { e.printStackTrace(); fail(e.getMessage()); @@ -420,7 +423,7 @@ public void testSubscriptionWhenConfigNodeLeaderChange() throws Exception { // Auto commit } } - consumerRef.unsubscribe("topic1"); + consumerRef.unsubscribe(topicName); } catch (final Exception e) { e.printStackTrace(); // Avoid failure diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java index 8dfaf86d812ad..01640135b7b68 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java @@ -32,6 +32,7 @@ import org.apache.iotdb.db.protocol.client.ConfigNodeClient; import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager; import org.apache.iotdb.db.protocol.client.ConfigNodeInfo; +import org.apache.iotdb.db.queryengine.plan.parser.ASTVisitor; import org.apache.iotdb.db.subscription.agent.SubscriptionAgent; import org.apache.iotdb.db.subscription.broker.SerializedEnrichedEvent; import org.apache.iotdb.db.subscription.timer.SubscriptionPollTimer; @@ -245,6 +246,7 @@ private TPipeSubscribeResp handlePipeSubscribeSubscribeInternal(PipeSubscribeSub // subscribe topics Set topicNames = req.getTopicNames(); + topicNames = topicNames.stream().map(ASTVisitor::parseIdentifier).collect(Collectors.toSet()); subscribe(consumerConfig, topicNames); LOGGER.info("Subscription: consumer {} subscribe {} successfully", consumerConfig, topicNames); @@ -278,6 +280,7 @@ private TPipeSubscribeResp handlePipeSubscribeUnsubscribeInternal( // unsubscribe topics Set topicNames = req.getTopicNames(); + topicNames = topicNames.stream().map(ASTVisitor::parseIdentifier).collect(Collectors.toSet()); unsubscribe(consumerConfig, topicNames); LOGGER.info( @@ -316,6 +319,8 @@ private TPipeSubscribeResp handlePipeSubscribePollInternal(PipeSubscribePollReq SubscriptionAgent.consumer() .getTopicsSubscribedByConsumer( consumerConfig.getConsumerGroupId(), consumerConfig.getConsumerId()); + } else { + topicNames = topicNames.stream().map(ASTVisitor::parseIdentifier).collect(Collectors.toSet()); } SubscriptionPollTimer timer = new SubscriptionPollTimer(