From c0f824895a4d0c071e70e03e4fb9f06ba01b4560 Mon Sep 17 00:00:00 2001 From: VGalaxies Date: Wed, 24 Jul 2024 18:30:58 +0800 Subject: [PATCH 1/2] setup --- .../rpc/subscription/config/TopicConfig.java | 26 +++++----------- .../executor/ClusterConfigTaskExecutor.java | 30 +++++++++++++++---- .../subscription/meta/topic/TopicMeta.java | 4 +-- 3 files changed, 34 insertions(+), 26 deletions(-) diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConfig.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConfig.java index 21a4b4873c85e..fc967835dd24f 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConfig.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConfig.java @@ -88,26 +88,16 @@ public Map getAttributesWithPathOrPattern() { attributes.getOrDefault(TopicConstant.PATH_KEY, TopicConstant.PATH_DEFAULT_VALUE)); } - public Map getAttributesWithTimeRange(final long creationTime) { + public Map getAttributesWithTimeRange() { final Map attributesWithTimeRange = new HashMap<>(); - // parse start time - final String startTime = - attributes.getOrDefault(TopicConstant.START_TIME_KEY, String.valueOf(Long.MIN_VALUE)); - if (TopicConstant.NOW_TIME_VALUE.equalsIgnoreCase(startTime)) { - attributesWithTimeRange.put(TopicConstant.START_TIME_KEY, String.valueOf(creationTime)); - } else { - attributesWithTimeRange.put(TopicConstant.START_TIME_KEY, startTime); - } - - // parse end time - final String endTime = - attributes.getOrDefault(TopicConstant.END_TIME_KEY, String.valueOf(Long.MAX_VALUE)); - if (TopicConstant.NOW_TIME_VALUE.equalsIgnoreCase(endTime)) { - attributesWithTimeRange.put(TopicConstant.END_TIME_KEY, String.valueOf(creationTime)); - } else { - attributesWithTimeRange.put(TopicConstant.END_TIME_KEY, endTime); - } + // there should be no TopicConstant.NOW_TIME_VALUE here + attributesWithTimeRange.put( + TopicConstant.START_TIME_KEY, + attributes.getOrDefault(TopicConstant.START_TIME_KEY, String.valueOf(Long.MIN_VALUE))); + attributesWithTimeRange.put( + TopicConstant.END_TIME_KEY, + attributes.getOrDefault(TopicConstant.END_TIME_KEY, String.valueOf(Long.MAX_VALUE))); return attributesWithTimeRange; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java index e655d1fc71d97..8587fc606dd73 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java @@ -222,6 +222,7 @@ import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.StatementExecutionException; import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.iotdb.rpc.subscription.config.TopicConstant; import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq; import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp; import org.apache.iotdb.trigger.api.Trigger; @@ -1991,14 +1992,31 @@ public SettableFuture createTopic(CreateTopicStatement createT final String topicName = createTopicStatement.getTopicName(); final Map topicAttributes = createTopicStatement.getTopicAttributes(); + // Replace now value with current time (raw timestamp based on system timestamp precision) + final long currentTime = + CommonDateTimeUtils.convertMilliTimeWithPrecision( + System.currentTimeMillis(), + CommonDescriptor.getInstance().getConfig().getTimestampPrecision()); + topicAttributes.computeIfPresent( + TopicConstant.START_TIME_KEY, + (k, v) -> { + if (TopicConstant.NOW_TIME_VALUE.equals(v)) { + return String.valueOf(currentTime); + } + return v; + }); + topicAttributes.computeIfPresent( + TopicConstant.END_TIME_KEY, + (k, v) -> { + if (TopicConstant.NOW_TIME_VALUE.equals(v)) { + return String.valueOf(currentTime); + } + return v; + }); + // Validate topic config final TopicMeta temporaryTopicMeta = - new TopicMeta( - topicName, - CommonDateTimeUtils.convertMilliTimeWithPrecision( - System.currentTimeMillis(), - CommonDescriptor.getInstance().getConfig().getTimestampPrecision()), - topicAttributes); + new TopicMeta(topicName, System.currentTimeMillis(), topicAttributes); try { PipeDataNodeAgent.plugin() .validateExtractor(temporaryTopicMeta.generateExtractorAttributes()); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java index 14c48c85da33b..e3bde8a001764 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java @@ -39,7 +39,7 @@ public class TopicMeta { private String topicName; - private long creationTime; // raw timestamp based on system timestamp precision + private long creationTime; // unit in ms private TopicConfig config; private Set subscribedConsumerGroupIds; @@ -182,7 +182,7 @@ public Map generateExtractorAttributes() { // path extractorAttributes.putAll(config.getAttributesWithPathOrPattern()); // time - extractorAttributes.putAll(config.getAttributesWithTimeRange(creationTime)); + extractorAttributes.putAll(config.getAttributesWithTimeRange()); // realtime mode extractorAttributes.putAll(config.getAttributesWithRealtimeMode()); // source mode From a10e6e6eac3a3cf09aa0deb275e7f6db7c484949 Mon Sep 17 00:00:00 2001 From: VGalaxies Date: Wed, 24 Jul 2024 18:33:44 +0800 Subject: [PATCH 2/2] fixup! setup --- .../impl/subscription/topic/CreateTopicProcedure.java | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/CreateTopicProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/CreateTopicProcedure.java index afdbfe244d724..ed3d59bd3d480 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/CreateTopicProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/CreateTopicProcedure.java @@ -20,9 +20,7 @@ package org.apache.iotdb.confignode.procedure.impl.subscription.topic; import org.apache.iotdb.common.rpc.thrift.TSStatus; -import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.subscription.meta.topic.TopicMeta; -import org.apache.iotdb.commons.utils.CommonDateTimeUtils; import org.apache.iotdb.confignode.consensus.request.write.subscription.topic.CreateTopicPlan; import org.apache.iotdb.confignode.consensus.request.write.subscription.topic.DropTopicPlan; import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; @@ -78,9 +76,7 @@ protected void executeFromValidate(ConfigNodeProcedureEnv env) throws Subscripti topicMeta = new TopicMeta( createTopicReq.getTopicName(), - CommonDateTimeUtils.convertMilliTimeWithPrecision( - System.currentTimeMillis(), - CommonDescriptor.getInstance().getConfig().getTimestampPrecision()), + System.currentTimeMillis(), createTopicReq.getTopicAttributes()); }