Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -88,26 +88,16 @@ public Map<String, String> getAttributesWithPathOrPattern() {
attributes.getOrDefault(TopicConstant.PATH_KEY, TopicConstant.PATH_DEFAULT_VALUE));
}

public Map<String, String> getAttributesWithTimeRange(final long creationTime) {
public Map<String, String> getAttributesWithTimeRange() {
final Map<String, String> attributesWithTimeRange = new HashMap<>();

// parse start time
final String startTime =
attributes.getOrDefault(TopicConstant.START_TIME_KEY, String.valueOf(Long.MIN_VALUE));
if (TopicConstant.NOW_TIME_VALUE.equalsIgnoreCase(startTime)) {
attributesWithTimeRange.put(TopicConstant.START_TIME_KEY, String.valueOf(creationTime));
} else {
attributesWithTimeRange.put(TopicConstant.START_TIME_KEY, startTime);
}

// parse end time
final String endTime =
attributes.getOrDefault(TopicConstant.END_TIME_KEY, String.valueOf(Long.MAX_VALUE));
if (TopicConstant.NOW_TIME_VALUE.equalsIgnoreCase(endTime)) {
attributesWithTimeRange.put(TopicConstant.END_TIME_KEY, String.valueOf(creationTime));
} else {
attributesWithTimeRange.put(TopicConstant.END_TIME_KEY, endTime);
}
// there should be no TopicConstant.NOW_TIME_VALUE here
attributesWithTimeRange.put(
TopicConstant.START_TIME_KEY,
attributes.getOrDefault(TopicConstant.START_TIME_KEY, String.valueOf(Long.MIN_VALUE)));
attributesWithTimeRange.put(
TopicConstant.END_TIME_KEY,
attributes.getOrDefault(TopicConstant.END_TIME_KEY, String.valueOf(Long.MAX_VALUE)));

return attributesWithTimeRange;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@
package org.apache.iotdb.confignode.procedure.impl.subscription.topic;

import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.subscription.meta.topic.TopicMeta;
import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
import org.apache.iotdb.confignode.consensus.request.write.subscription.topic.CreateTopicPlan;
import org.apache.iotdb.confignode.consensus.request.write.subscription.topic.DropTopicPlan;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
Expand Down Expand Up @@ -78,9 +76,7 @@ protected void executeFromValidate(ConfigNodeProcedureEnv env) throws Subscripti
topicMeta =
new TopicMeta(
createTopicReq.getTopicName(),
CommonDateTimeUtils.convertMilliTimeWithPrecision(
System.currentTimeMillis(),
CommonDescriptor.getInstance().getConfig().getTimestampPrecision()),
System.currentTimeMillis(),
createTopicReq.getTopicAttributes());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.rpc.subscription.config.TopicConstant;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
import org.apache.iotdb.trigger.api.Trigger;
Expand Down Expand Up @@ -1991,14 +1992,31 @@ public SettableFuture<ConfigTaskResult> createTopic(CreateTopicStatement createT
final String topicName = createTopicStatement.getTopicName();
final Map<String, String> topicAttributes = createTopicStatement.getTopicAttributes();

// Replace now value with current time (raw timestamp based on system timestamp precision)
final long currentTime =
CommonDateTimeUtils.convertMilliTimeWithPrecision(
System.currentTimeMillis(),
CommonDescriptor.getInstance().getConfig().getTimestampPrecision());
topicAttributes.computeIfPresent(
TopicConstant.START_TIME_KEY,
(k, v) -> {
if (TopicConstant.NOW_TIME_VALUE.equals(v)) {
return String.valueOf(currentTime);
}
return v;
});
topicAttributes.computeIfPresent(
TopicConstant.END_TIME_KEY,
(k, v) -> {
if (TopicConstant.NOW_TIME_VALUE.equals(v)) {
return String.valueOf(currentTime);
}
return v;
});

// Validate topic config
final TopicMeta temporaryTopicMeta =
new TopicMeta(
topicName,
CommonDateTimeUtils.convertMilliTimeWithPrecision(
System.currentTimeMillis(),
CommonDescriptor.getInstance().getConfig().getTimestampPrecision()),
topicAttributes);
new TopicMeta(topicName, System.currentTimeMillis(), topicAttributes);
try {
PipeDataNodeAgent.plugin()
.validateExtractor(temporaryTopicMeta.generateExtractorAttributes());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
public class TopicMeta {

private String topicName;
private long creationTime; // raw timestamp based on system timestamp precision
private long creationTime; // unit in ms
private TopicConfig config;

private Set<String> subscribedConsumerGroupIds;
Expand Down Expand Up @@ -182,7 +182,7 @@ public Map<String, String> generateExtractorAttributes() {
// path
extractorAttributes.putAll(config.getAttributesWithPathOrPattern());
// time
extractorAttributes.putAll(config.getAttributesWithTimeRange(creationTime));
extractorAttributes.putAll(config.getAttributesWithTimeRange());
// realtime mode
extractorAttributes.putAll(config.getAttributesWithRealtimeMode());
// source mode
Expand Down