Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.iotdb.session.subscription.SubscriptionSessionDataSets;

import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Properties;

Expand Down Expand Up @@ -63,10 +64,12 @@ public static void main(String[] args) throws Exception {
session.executeNonQueryStatement("flush");

// Create topic
final String topic1 = "topic1";
final String topic2 = "`topic2`";
try (SubscriptionSession subscriptionSession = new SubscriptionSession(LOCAL_HOST, 6667)) {
subscriptionSession.open();
subscriptionSession.createTopic("topic1");
subscriptionSession.createTopic("topic2");
subscriptionSession.createTopic(topic1);
subscriptionSession.createTopic(topic2);
}

// Subscription: property-style ctor
Expand All @@ -75,7 +78,7 @@ public static void main(String[] args) throws Exception {
config.put(ConsumerConstant.CONSUMER_GROUP_ID_KEY, "cg1");
SubscriptionPullConsumer consumer1 = new SubscriptionPullConsumer(config);
consumer1.open();
consumer1.subscribe("topic1");
consumer1.subscribe(topic1);
while (true) {
Thread.sleep(1000); // Wait for some time
List<SubscriptionMessage> messages = consumer1.poll(Duration.ofMillis(10000));
Expand All @@ -102,7 +105,7 @@ public static void main(String[] args) throws Exception {
subscriptionSession.getSubscriptions().forEach((System.out::println));
}

consumer1.unsubscribe("topic1");
consumer1.unsubscribe(topic1);
consumer1.close();

// Subscription: builder-style ctor
Expand All @@ -113,10 +116,11 @@ public static void main(String[] args) throws Exception {
.autoCommit(false)
.buildPullConsumer()) {
consumer2.open();
consumer2.subscribe("topic2");
consumer2.subscribe(topic2);
while (true) {
Thread.sleep(1000); // wait some time
List<SubscriptionMessage> messages = consumer2.poll(Duration.ofMillis(10000));
List<SubscriptionMessage> messages =
consumer2.poll(Collections.singleton(topic2), Duration.ofMillis(10000));
if (messages.isEmpty()) {
break;
}
Expand All @@ -132,7 +136,7 @@ public static void main(String[] args) throws Exception {
}
consumer2.commitSync(messages);
}
consumer2.unsubscribe("topic2");
consumer2.unsubscribe(topic2);
}

// Query
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,14 @@ public void testTopicPathSubscription() throws Exception {
}

// Create topic on sender
final String topicName = "topic1";
final String host = senderEnv.getIP();
final int port = Integer.parseInt(senderEnv.getPort());
try (final SubscriptionSession session = new SubscriptionSession(host, port)) {
session.open();
final Properties config = new Properties();
config.put(TopicConstant.PATH_KEY, "root.db.*.s");
session.createTopic("topic1", config);
session.createTopic(topicName, config);
} catch (final Exception e) {
e.printStackTrace();
fail(e.getMessage());
Expand All @@ -105,7 +106,7 @@ public void testTopicPathSubscription() throws Exception {
.buildPullConsumer();
final ISession session = receiverEnv.getSessionConnection()) {
consumer.open();
consumer.subscribe("topic1");
consumer.subscribe(topicName);
while (!isClosed.get()) {
try {
Thread.sleep(1000); // wait some time
Expand All @@ -127,7 +128,7 @@ public void testTopicPathSubscription() throws Exception {
}
consumer.commitSync(messages);
}
consumer.unsubscribe("topic1");
consumer.unsubscribe(topicName);
} catch (final Exception e) {
e.printStackTrace();
// Avoid fail
Expand Down Expand Up @@ -184,13 +185,14 @@ public void testTopicTimeSubscription() throws Exception {
}

// Create topic on sender
final String topicName = "topic2";
final String host = senderEnv.getIP();
final int port = Integer.parseInt(senderEnv.getPort());
try (final SubscriptionSession session = new SubscriptionSession(host, port)) {
session.open();
final Properties config = new Properties();
config.put(TopicConstant.START_TIME_KEY, currentTime);
session.createTopic("topic1", config);
session.createTopic(topicName, config);
} catch (final Exception e) {
e.printStackTrace();
fail(e.getMessage());
Expand All @@ -211,7 +213,7 @@ public void testTopicTimeSubscription() throws Exception {
.buildPullConsumer();
final ISession session = receiverEnv.getSessionConnection()) {
consumer.open();
consumer.subscribe("topic1");
consumer.subscribe(topicName);
while (!isClosed.get()) {
try {
Thread.sleep(1000); // wait some time
Expand All @@ -233,7 +235,7 @@ public void testTopicTimeSubscription() throws Exception {
}
consumer.commitSync(messages);
}
consumer.unsubscribe("topic1");
consumer.unsubscribe(topicName);
} catch (final Exception e) {
e.printStackTrace();
// Avoid failure
Expand Down Expand Up @@ -284,6 +286,7 @@ public void testTopicProcessorSubscription() throws Exception {
}

// Create topic
final String topicName = "topic3";
final String host = senderEnv.getIP();
final int port = Integer.parseInt(senderEnv.getPort());
try (final SubscriptionSession session = new SubscriptionSession(host, port)) {
Expand All @@ -292,7 +295,7 @@ public void testTopicProcessorSubscription() throws Exception {
config.put("processor", "tumbling-time-sampling-processor");
config.put("processor.tumbling-time.interval-seconds", "1");
config.put("processor.down-sampling.split-file", "true");
session.createTopic("topic1", config);
session.createTopic(topicName, config);
} catch (final Exception e) {
e.printStackTrace();
fail(e.getMessage());
Expand All @@ -313,7 +316,7 @@ public void testTopicProcessorSubscription() throws Exception {
.buildPullConsumer();
final ISession session = receiverEnv.getSessionConnection()) {
consumer.open();
consumer.subscribe("topic1");
consumer.subscribe(topicName);
while (!isClosed.get()) {
try {
Thread.sleep(1000); // wait some time
Expand All @@ -335,7 +338,7 @@ public void testTopicProcessorSubscription() throws Exception {
}
consumer.commitSync(messages);
}
consumer.unsubscribe("topic1");
consumer.unsubscribe(topicName);
} catch (final Exception e) {
e.printStackTrace();
// Avoid failure
Expand Down Expand Up @@ -373,4 +376,136 @@ public void testTopicProcessorSubscription() throws Exception {
thread.join();
}
}

@Test
public void testTopicNameWithBackQuote() throws Exception {
// Insert some historical data on sender
try (final ISession session = senderEnv.getSessionConnection()) {
for (int i = 0; i < 100; ++i) {
session.executeNonQueryStatement(
String.format("insert into root.db.d1(time, s) values (%s, 1)", i));
}
for (int i = 100; i < 200; ++i) {
session.executeNonQueryStatement(
String.format("insert into root.db.d1(time, s) values (%s, 1)", i));
}
for (int i = 200; i < 300; ++i) {
session.executeNonQueryStatement(
String.format("insert into root.db.d1(time, s) values (%s, 1)", i));
}
session.executeNonQueryStatement("flush");
} catch (final Exception e) {
e.printStackTrace();
fail(e.getMessage());
}

// Create topic on sender
final String topic1 = "`topic1`";
final String topic2 = "`'topic2'`";
final String topic3 = "`\"topic3\"`";
final String host = senderEnv.getIP();
final int port = Integer.parseInt(senderEnv.getPort());
try (final SubscriptionSession session = new SubscriptionSession(host, port)) {
session.open();
{
final Properties config = new Properties();
config.put(TopicConstant.START_TIME_KEY, 0);
config.put(TopicConstant.END_TIME_KEY, 99);
session.createTopic(topic1, config);
}
{
final Properties config = new Properties();
config.put(TopicConstant.START_TIME_KEY, 100);
config.put(TopicConstant.END_TIME_KEY, 199);
session.createTopic(topic2, config);
}
{
final Properties config = new Properties();
config.put(TopicConstant.START_TIME_KEY, 200);
config.put(TopicConstant.END_TIME_KEY, 299);
session.createTopic(topic3, config);
}
} catch (final Exception e) {
e.printStackTrace();
fail(e.getMessage());
}

// Subscribe on sender and insert on receiver
final Set<String> topics = new HashSet<>();
topics.add(topic1);
topics.add(topic2);
topics.add(topic3);
final AtomicBoolean isClosed = new AtomicBoolean(false);
final Thread thread =
new Thread(
() -> {
try (final SubscriptionPullConsumer consumer =
new SubscriptionPullConsumer.Builder()
.host(host)
.port(port)
.consumerId("c1")
.consumerGroupId("cg1")
.autoCommit(false)
.buildPullConsumer();
final ISession session = receiverEnv.getSessionConnection()) {
consumer.open();
consumer.subscribe(topics);
while (!isClosed.get()) {
try {
Thread.sleep(1000); // wait some time
} catch (final InterruptedException e) {
break;
}
final List<SubscriptionMessage> messages =
consumer.poll(topics, Duration.ofMillis(10000));
if (messages.isEmpty()) {
continue;
}
for (final SubscriptionMessage message : messages) {
final SubscriptionSessionDataSets payload =
(SubscriptionSessionDataSets) message.getPayload();
for (final Iterator<Tablet> it = payload.tabletIterator(); it.hasNext(); ) {
final Tablet tablet = it.next();
session.insertTablet(tablet);
}
}
consumer.commitSync(messages);
}
consumer.unsubscribe(topics);
} catch (final Exception e) {
e.printStackTrace();
// Avoid failure
} finally {
LOGGER.info("consumer exiting...");
}
});
thread.start();

// Check data on receiver
try {
try (final Connection connection = receiverEnv.getConnection();
final Statement statement = connection.createStatement()) {
// Keep retrying if there are execution failures
Awaitility.await()
.pollDelay(1, TimeUnit.SECONDS)
.pollInterval(1, TimeUnit.SECONDS)
.atMost(120, TimeUnit.SECONDS)
.untilAsserted(
() ->
TestUtils.assertSingleResultSetEqual(
TestUtils.executeQueryWithRetry(statement, "select count(*) from root.**"),
new HashMap<String, String>() {
{
put("count(root.db.d1.s)", "300");
}
}));
}
} catch (final Exception e) {
e.printStackTrace();
fail(e.getMessage());
} finally {
isClosed.set(true);
thread.join();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,12 @@ public void testBasicSubscription() throws Exception {
}

// Create topic
final String topicName = "topic1";
final String host = EnvFactory.getEnv().getIP();
final int port = Integer.parseInt(EnvFactory.getEnv().getPort());
try (final SubscriptionSession session = new SubscriptionSession(host, port)) {
session.open();
session.createTopic("topic1");
session.createTopic(topicName);
} catch (final Exception e) {
e.printStackTrace();
fail(e.getMessage());
Expand All @@ -103,7 +104,7 @@ public void testBasicSubscription() throws Exception {
.autoCommit(false)
.buildPullConsumer()) {
consumer.open();
consumer.subscribe("topic1");
consumer.subscribe(topicName);
while (!isClosed.get()) {
try {
Thread.sleep(1000); // wait some time
Expand All @@ -127,7 +128,7 @@ public void testBasicSubscription() throws Exception {
}
consumer.commitSync(messages);
}
consumer.unsubscribe("topic1");
consumer.unsubscribe(topicName);
} catch (final Exception e) {
e.printStackTrace();
// Avoid failure
Expand Down
Loading