Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ private static void dataSubscription() throws Exception {
}
}
for (final SubscriptionMessage message : messages) {
System.out.println(message.getDatabaseName());
for (final SubscriptionSessionDataSet dataSet : message.getSessionDataSetsHandler()) {
System.out.println(dataSet.getColumnNames());
System.out.println(dataSet.getColumnTypes());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -88,13 +89,21 @@ public static TopicConfig deserialize(final ByteBuffer buffer) {
return new TopicConfig(ReadWriteIOUtils.readMap(buffer));
}

/////////////////////////////// utilities ///////////////////////////////
/////////////////////////////// table model ///////////////////////////////

public boolean isTableTopic() {
return SQL_DIALECT_TABLE_VALUE.equalsIgnoreCase(
attributes.getOrDefault(SQL_DIALECT_KEY, SQL_DIALECT_TREE_VALUE));
}

public Optional<String> getTopicDatabaseName() {
return isTableTopic()
? Optional.of(
attributes.getOrDefault(
TopicConstant.DATABASE_KEY, TopicConstant.DATABASE_DEFAULT_VALUE))
: Optional.empty();
}

/////////////////////////////// extractor attributes mapping ///////////////////////////////

public Map<String, String> getAttributeWithSqlDialect() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,13 @@ public String getConsumerGroupId() {
return consumerGroupId;
}

/////////////////////////////// table model ///////////////////////////////

public Optional<String> getTopicDatabaseName(final String topicName) {
return Optional.ofNullable(subscribedTopics.get(topicName))
.flatMap(TopicConfig::getTopicDatabaseName);
}

/////////////////////////////// ctor ///////////////////////////////

protected AbstractSubscriptionConsumer(final AbstractSubscriptionConsumerBuilder builder) {
Expand Down Expand Up @@ -731,7 +738,7 @@ private Optional<SubscriptionMessage> pollFile(
// construct temporary message to nack
nack(
Collections.singletonList(
new SubscriptionMessage(commitContext, file.getAbsolutePath())));
new SubscriptionMessage(commitContext, file.getAbsolutePath(), null)));
throw new SubscriptionRuntimeNonCriticalException(e.getMessage(), e);
}
}
Expand Down Expand Up @@ -884,7 +891,10 @@ private SubscriptionMessage pollFileInternal(

// generate subscription message
inFlightFilesCommitContextSet.remove(commitContext);
return new SubscriptionMessage(commitContext, file.getAbsolutePath());
return new SubscriptionMessage(
commitContext,
file.getAbsolutePath(),
getTopicDatabaseName(commitContext.getTopicName()).orElse(null));
}
case ERROR:
{
Expand Down Expand Up @@ -934,7 +944,7 @@ private Optional<SubscriptionMessage> pollTablets(
// construct temporary message to nack
nack(
Collections.singletonList(
new SubscriptionMessage(response.getCommitContext(), Collections.emptyList())));
new SubscriptionMessage(response.getCommitContext(), Collections.emptyList(), null)));
throw new SubscriptionRuntimeNonCriticalException(e.getMessage(), e);
}
}
Expand All @@ -955,7 +965,11 @@ private Optional<SubscriptionMessage> pollTabletsInternal(
LOGGER.warn(errorMessage);
throw new SubscriptionRuntimeNonCriticalException(errorMessage);
}
return Optional.of(new SubscriptionMessage(commitContext, tablets));
return Optional.of(
new SubscriptionMessage(
commitContext,
tablets,
getTopicDatabaseName(commitContext.getTopicName()).orElse(null)));
}

timer.update();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext;

import org.apache.thrift.annotation.Nullable;
import org.apache.tsfile.write.record.Tablet;

import java.util.List;
Expand All @@ -35,18 +36,26 @@ public class SubscriptionMessage

private final SubscriptionMessageHandler handler;

@Nullable private final String databaseName;

public SubscriptionMessage(
final SubscriptionCommitContext commitContext, final List<Tablet> tablets) {
final SubscriptionCommitContext commitContext,
final List<Tablet> tablets,
@Nullable final String databaseName) {
this.commitContext = commitContext;
this.messageType = SubscriptionMessageType.SESSION_DATA_SETS_HANDLER.getType();
this.handler = new SubscriptionSessionDataSetsHandler(tablets);
this.databaseName = databaseName;
}

public SubscriptionMessage(
final SubscriptionCommitContext commitContext, final String absolutePath) {
final SubscriptionCommitContext commitContext,
final String absolutePath,
@Nullable final String databaseName) {
this.commitContext = commitContext;
this.messageType = SubscriptionMessageType.TS_FILE_HANDLER.getType();
this.handler = new SubscriptionTsFileHandler(absolutePath);
this.databaseName = databaseName;
}

public SubscriptionCommitContext getCommitContext() {
Expand All @@ -57,6 +66,11 @@ public short getMessageType() {
return messageType;
}

@Nullable
public String getDatabaseName() {
return databaseName;
}

/////////////////////////////// override ///////////////////////////////

@Override
Expand Down
Loading