Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -552,4 +552,73 @@ public void testDataSetDeduplication() {
fail(e.getMessage());
}
}

// same to
// org.apache.iotdb.subscription.it.local.IoTDBSubscriptionBasicIT.testDataSetDeduplication,
// but missing consumer id & consumer group id when building consumer
@Test
public void testMissingConsumerId() {
// Insert some historical data
try (final ISession session = EnvFactory.getEnv().getSessionConnection()) {
session.createDatabase("root.db");
for (int i = 0; i < 100; ++i) {
session.executeNonQueryStatement(
String.format("insert into root.db.d1(time, s1, s2) values (%s, 1, 2)", i));
session.executeNonQueryStatement(
String.format("insert into root.db.d2(time, s1, s2) values (%s, 3, 4)", i));
}
// DO NOT FLUSH HERE
} catch (final Exception e) {
e.printStackTrace();
fail(e.getMessage());
}

// Create topic
final String topicName = "topic7";
final String host = EnvFactory.getEnv().getIP();
final int port = Integer.parseInt(EnvFactory.getEnv().getPort());
try (final SubscriptionSession session = new SubscriptionSession(host, port)) {
session.open();
final Properties config = new Properties();
config.put(TopicConstant.PATTERN_KEY, "root.db.d1.s1");
session.createTopic(topicName, config);
} catch (final Exception e) {
e.printStackTrace();
fail(e.getMessage());
}

// Subscription
final AtomicInteger rowCount = new AtomicInteger();
try (final SubscriptionPushConsumer consumer =
new SubscriptionPushConsumer.Builder()
.host(host)
.port(port)
.ackStrategy(AckStrategy.AFTER_CONSUME)
.consumeListener(
message -> {
for (final SubscriptionSessionDataSet dataSet :
message.getSessionDataSetsHandler()) {
while (dataSet.hasNext()) {
dataSet.next();
rowCount.addAndGet(1);
}
}
return ConsumeResult.SUCCESS;
})
.buildPushConsumer()) {

consumer.open();
consumer.subscribe(topicName);

AWAIT.untilAsserted(
() -> {
Assert.assertEquals(100, rowCount.get());
Assert.assertNotNull(consumer.getConsumerId());
Assert.assertNotNull(consumer.getConsumerGroupId());
});
} catch (final Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,9 @@ public void testCreateConsumer_null() {
new SubscriptionPullConsumer(null).open();
}

@Test(expected = NullPointerException.class)
@Test(
expected =
SubscriptionConnectionException.class) // connect to TEndPoint(ip:localhost, port:6667)
public void testCreateConsumer_empty() {
new SubscriptionPullConsumer(new Properties()).open();
}
Expand All @@ -289,7 +291,7 @@ public void testCreateConsumer_empty2() {
new SubscriptionPullConsumer.Builder().buildPullConsumer().open();
}

@Test(expected = NullPointerException.class)
@Test(expected = SubscriptionIdentifierSemanticException.class)
public void testSubscribe_null() {
consumer.subscribe((String) null);
}
Expand Down Expand Up @@ -319,7 +321,7 @@ public void testSubscribe_dup() {
consumer1.close();
}

@Test(expected = NullPointerException.class)
@Test(expected = SubscriptionIdentifierSemanticException.class)
public void testUnSubscribe_null() {
consumer.unsubscribe((String) null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,9 @@ public void testCreateConsumer_null() {
new SubscriptionPushConsumer(null).open();
}

@Test(expected = NullPointerException.class)
@Test(
expected =
SubscriptionConnectionException.class) // connect to TEndPoint(ip:localhost, port:6667)
public void testCreateConsumer_empty() {
new SubscriptionPushConsumer(new Properties()).open();
}
Expand All @@ -216,7 +218,7 @@ public void testCreateConsumer_empty2() {
new SubscriptionPushConsumer.Builder().buildPushConsumer().open();
}

@Test(expected = NullPointerException.class)
@Test(expected = SubscriptionIdentifierSemanticException.class)
public void testSubscribe_null() {
consumer.subscribe((String) null);
}
Expand Down Expand Up @@ -246,7 +248,7 @@ public void testSubscribe_dup() {
consumer1.close();
}

@Test(expected = NullPointerException.class)
@Test(expected = SubscriptionIdentifierSemanticException.class)
public void testUnSubscribe_null() {
consumer.unsubscribe((String) null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.rpc.subscription.config.TopicConstant;
import org.apache.iotdb.rpc.subscription.exception.SubscriptionIdentifierSemanticException;
import org.apache.iotdb.session.subscription.consumer.SubscriptionPullConsumer;
import org.apache.iotdb.subscription.it.triple.regression.AbstractSubscriptionRegressionIT;

Expand Down Expand Up @@ -78,13 +79,13 @@ private void printTopics(String msg)
subs.getTopics().forEach(System.out::println);
}

@Test // Will create a topic named null
@Test(expected = SubscriptionIdentifierSemanticException.class)
public void testCreateTopic_null() throws IoTDBConnectionException, StatementExecutionException {
subs.createTopic(null);
printTopics("testCreateTopic_null");
}

@Test(expected = StatementExecutionException.class)
@Test(expected = SubscriptionIdentifierSemanticException.class)
public void testCreateTopic_emptyString()
throws IoTDBConnectionException, StatementExecutionException {
subs.createTopic("");
Expand All @@ -98,7 +99,7 @@ public void testCreateTopic_dup() throws IoTDBConnectionException, StatementExec
printTopics("testCreateTopic_dup");
}

@Test(expected = StatementExecutionException.class)
@Test(expected = SubscriptionIdentifierSemanticException.class)
public void testCreateTopic_invalid()
throws IoTDBConnectionException, StatementExecutionException {
subs.createTopic("Topic-1");
Expand Down Expand Up @@ -209,12 +210,12 @@ public void testCreateTopic_invalidTime6()
dropDB(database);
}

@Test(expected = StatementExecutionException.class) // drop non-existent topic
@Test(expected = SubscriptionIdentifierSemanticException.class)
public void testDropTopic_null() throws IoTDBConnectionException, StatementExecutionException {
subs.dropTopic(null);
}

@Test(expected = StatementExecutionException.class)
@Test(expected = SubscriptionIdentifierSemanticException.class)
public void testDropTopic_empty() throws IoTDBConnectionException, StatementExecutionException {
subs.dropTopic("");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.iotdb.session.SessionConnection;
import org.apache.iotdb.session.subscription.model.Subscription;
import org.apache.iotdb.session.subscription.model.Topic;
import org.apache.iotdb.session.subscription.util.IdentifierUtils;

import org.apache.tsfile.read.common.Field;
import org.apache.tsfile.read.common.RowRecord;
Expand Down Expand Up @@ -101,6 +102,7 @@ public SessionConnection constructSessionConnection(
*/
public void createTopic(final String topicName)
throws IoTDBConnectionException, StatementExecutionException {
IdentifierUtils.checkAndParseIdentifier(topicName); // ignore the parse result
final String sql = String.format("CREATE TOPIC %s", topicName);
executeNonQueryStatement(sql);
}
Expand All @@ -118,6 +120,7 @@ public void createTopic(final String topicName)
*/
public void createTopicIfNotExists(final String topicName)
throws IoTDBConnectionException, StatementExecutionException {
IdentifierUtils.checkAndParseIdentifier(topicName); // ignore the parse result
final String sql = String.format("CREATE TOPIC IF NOT EXISTS %s", topicName);
executeNonQueryStatement(sql);
}
Expand All @@ -136,6 +139,7 @@ public void createTopicIfNotExists(final String topicName)
*/
public void createTopic(final String topicName, final Properties properties)
throws IoTDBConnectionException, StatementExecutionException {
IdentifierUtils.checkAndParseIdentifier(topicName); // ignore the parse result
createTopic(topicName, properties, false);
}

Expand All @@ -151,6 +155,7 @@ public void createTopic(final String topicName, final Properties properties)
*/
public void createTopicIfNotExists(final String topicName, final Properties properties)
throws IoTDBConnectionException, StatementExecutionException {
IdentifierUtils.checkAndParseIdentifier(topicName); // ignore the parse result
createTopic(topicName, properties, true);
}

Expand Down Expand Up @@ -195,6 +200,7 @@ private void createTopic(
*/
public void dropTopic(final String topicName)
throws IoTDBConnectionException, StatementExecutionException {
IdentifierUtils.checkAndParseIdentifier(topicName); // ignore the parse result
final String sql = String.format("DROP TOPIC %s", topicName);
executeNonQueryStatement(sql);
}
Expand All @@ -212,6 +218,7 @@ public void dropTopic(final String topicName)
*/
public void dropTopicIfExists(final String topicName)
throws IoTDBConnectionException, StatementExecutionException {
IdentifierUtils.checkAndParseIdentifier(topicName); // ignore the parse result
final String sql = String.format("DROP TOPIC IF EXISTS %s", topicName);
executeNonQueryStatement(sql);
}
Expand All @@ -225,6 +232,7 @@ public Set<Topic> getTopics() throws IoTDBConnectionException, StatementExecutio

public Optional<Topic> getTopic(final String topicName)
throws IoTDBConnectionException, StatementExecutionException {
IdentifierUtils.checkAndParseIdentifier(topicName); // ignore the parse result
final String sql = String.format("SHOW TOPIC %s", topicName);
try (final SessionDataSet dataSet = executeQueryStatement(sql)) {
final Set<Topic> topics = convertDataSetToTopics(dataSet);
Expand All @@ -247,6 +255,7 @@ public Set<Subscription> getSubscriptions()

public Set<Subscription> getSubscriptions(final String topicName)
throws IoTDBConnectionException, StatementExecutionException {
IdentifierUtils.checkAndParseIdentifier(topicName); // ignore the parse result
final String sql = String.format("SHOW SUBSCRIPTIONS ON %s", topicName);
try (final SessionDataSet dataSet = executeQueryStatement(sql)) {
return convertDataSetToSubscriptions(dataSet);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.iotdb.session.subscription.util.RandomStringGenerator;
import org.apache.iotdb.session.util.SessionUtils;

import org.apache.thrift.annotation.Nullable;
import org.apache.tsfile.write.record.Tablet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -316,7 +317,9 @@ private void subscribe(Set<String> topicNames, final boolean needParse)

if (needParse) {
topicNames =
topicNames.stream().map(IdentifierUtils::parseIdentifier).collect(Collectors.toSet());
topicNames.stream()
.map(IdentifierUtils::checkAndParseIdentifier)
.collect(Collectors.toSet());
}

providers.acquireReadLock();
Expand Down Expand Up @@ -346,7 +349,9 @@ private void unsubscribe(Set<String> topicNames, final boolean needParse)

if (needParse) {
topicNames =
topicNames.stream().map(IdentifierUtils::parseIdentifier).collect(Collectors.toSet());
topicNames.stream()
.map(IdentifierUtils::checkAndParseIdentifier)
.collect(Collectors.toSet());
}

providers.acquireReadLock();
Expand Down Expand Up @@ -377,7 +382,7 @@ SubscriptionProvider constructProviderAndHandshake(final TEndPoint endPoint)
} catch (final Exception ignored) {
}
throw new SubscriptionConnectionException(
String.format("Failed to handshake with subscription provider %s", provider));
String.format("Failed to handshake with subscription provider %s", provider), e);
}

// update consumer id and consumer group id if not exist
Expand Down Expand Up @@ -1407,13 +1412,19 @@ public Builder password(final String password) {
return this;
}

public Builder consumerId(final String consumerId) {
this.consumerId = IdentifierUtils.parseIdentifier(consumerId);
public Builder consumerId(@Nullable final String consumerId) {
if (Objects.isNull(consumerId)) {
return this;
}
this.consumerId = IdentifierUtils.checkAndParseIdentifier(consumerId);
return this;
}

public Builder consumerGroupId(final String consumerGroupId) {
this.consumerGroupId = IdentifierUtils.parseIdentifier(consumerGroupId);
public Builder consumerGroupId(@Nullable final String consumerGroupId) {
if (Objects.isNull(consumerGroupId)) {
return this;
}
this.consumerGroupId = IdentifierUtils.checkAndParseIdentifier(consumerGroupId);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,9 @@ public List<SubscriptionMessage> poll(final Set<String> topicNames, final long t
throws SubscriptionException {
// parse topic names from external source
Set<String> parsedTopicNames =
topicNames.stream().map(IdentifierUtils::parseIdentifier).collect(Collectors.toSet());
topicNames.stream()
.map(IdentifierUtils::checkAndParseIdentifier)
.collect(Collectors.toSet());

if (!parsedTopicNames.isEmpty()) {
// filter unsubscribed topics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,20 @@
import org.apache.tsfile.common.constant.TsFileConstant;
import org.apache.tsfile.read.common.parser.PathVisitor;

import java.util.Objects;

public class IdentifierUtils {

/**
* refer org.apache.iotdb.db.queryengine.plan.parser.ASTVisitor#parseIdentifier(java.lang.String)
*/
public static String parseIdentifier(final String src) {
public static String checkAndParseIdentifier(final String src) {
if (Objects.isNull(src)) {
throw new SubscriptionIdentifierSemanticException("null identifier is not supported");
}
if (src.isEmpty()) {
throw new SubscriptionIdentifierSemanticException("empty identifier is not supported");
}
if (src.startsWith(TsFileConstant.BACK_QUOTE_STRING)
&& src.endsWith(TsFileConstant.BACK_QUOTE_STRING)) {
return src.substring(1, src.length() - 1)
Expand Down