Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.qp.utils.DateTimeUtils;
import org.apache.iotdb.db.service.IoTDB;

import org.apache.commons.collections4.map.MultiKeyMap;
Expand Down Expand Up @@ -139,8 +140,8 @@ default MultiKeyMap<Long, PartitionGroup> partitionByPathRangeTime(

MultiKeyMap<Long, PartitionGroup> timeRangeMapRaftGroup = new MultiKeyMap<>();
PartialPath storageGroup = IoTDB.metaManager.getBelongedStorageGroup(path);
startTime = StorageEngine.convertMilliWithPrecision(startTime);
endTime = StorageEngine.convertMilliWithPrecision(endTime);
startTime = DateTimeUtils.convertMilliTimeWithPrecision(startTime);
endTime = DateTimeUtils.convertMilliTimeWithPrecision(endTime);
while (startTime <= endTime) {
long nextTime = (startTime / partitionInterval + 1) * partitionInterval;
timeRangeMapRaftGroup.put(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ private IoTDBConstant() {}

public static final String COLUMN_STORAGE_GROUP = "storage group";
public static final String COLUMN_LOCK_INFO = "lock holder";
public static final String COLUMN_TTL = "ttl";
public static final String COLUMN_TTL = "ttl(ms)";

public static final String COLUMN_TASK_ID = "task id";
public static final String COLUMN_SUBMIT_TIME = "submit time";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.iotdb.db.engine.compaction.cross.CrossCompactionStrategy;
import org.apache.iotdb.db.engine.compaction.inner.InnerCompactionStrategy;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.qp.utils.DatetimeUtils;
import org.apache.iotdb.db.qp.utils.DateTimeUtils;
import org.apache.iotdb.db.service.metrics.MetricService;
import org.apache.iotdb.external.api.IPropertiesLoader;
import org.apache.iotdb.metrics.config.MetricConfigDescriptor;
Expand Down Expand Up @@ -1510,7 +1510,7 @@ private void loadCQProps(Properties properties) {
}

conf.setContinuousQueryMinimumEveryInterval(
DatetimeUtils.convertDurationStrToLong(
DateTimeUtils.convertDurationStrToLong(
properties.getProperty("continuous_query_minimum_every_interval", "1s"),
conf.getTimestampPrecision()));

Expand Down
24 changes: 6 additions & 18 deletions server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertRowsOfOneDevicePlan;
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.qp.utils.DateTimeUtils;
import org.apache.iotdb.db.rescon.SystemInfo;
import org.apache.iotdb.db.service.IService;
import org.apache.iotdb.db.service.IoTDB;
Expand Down Expand Up @@ -254,7 +255,7 @@ public static StorageEngine getInstance() {

private static void initTimePartition() {
timePartitionInterval =
convertMilliWithPrecision(
DateTimeUtils.convertMilliTimeWithPrecision(
IoTDBDescriptor.getInstance().getConfig().getPartitionInterval() * 1000L);
}

Expand Down Expand Up @@ -297,22 +298,6 @@ public static void transmitOperationSync(PhysicalPlan physicalPlan) {
}
}

public static long convertMilliWithPrecision(long milliTime) {
long result = milliTime;
String timePrecision = IoTDBDescriptor.getInstance().getConfig().getTimestampPrecision();
switch (timePrecision) {
case "ns":
result = milliTime * 1000_000L;
break;
case "us":
result = milliTime * 1000L;
break;
default:
break;
}
return result;
}

public static String getDeviceNameByPlan(PhysicalPlan plan) {
if (plan instanceof InsertPlan) {
InsertPlan physicalPlan = (InsertPlan) plan;
Expand Down Expand Up @@ -342,6 +327,9 @@ public static long getTimePartitionInterval() {
@TestOnly
public static void setTimePartitionInterval(long timePartitionInterval) {
StorageEngine.timePartitionInterval = timePartitionInterval;
if (timePartitionInterval == -1) {
initTimePartition();
}
}

public static long getTimePartition(long time) {
Expand Down Expand Up @@ -787,7 +775,7 @@ public VirtualStorageGroupProcessor buildNewStorageGroupProcessor(
virtualStorageGroupId,
fileFlushPolicy,
storageGroupMNode.getFullPath());
processor.setDataTTL(storageGroupMNode.getDataTTL());
processor.setDataTTLWithTimePrecisionCheck(storageGroupMNode.getDataTTL());
processor.setCustomFlushListeners(customFlushListeners);
processor.setCustomCloseFileListeners(customCloseFileListeners);
return processor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.qp.utils.DatetimeUtils;
import org.apache.iotdb.db.qp.utils.DateTimeUtils;
import org.apache.iotdb.tsfile.utils.FilePathUtils;

import org.slf4j.Logger;
Expand Down Expand Up @@ -373,7 +373,7 @@ public void checkArchivingTasks() {

for (ArchivingTask task : archivingTasks) {

if (task.getStartTime() - DatetimeUtils.currentTime() <= 0
if (task.getStartTime() - DateTimeUtils.currentTime() <= 0
&& task.getStatus() == ArchivingTask.ArchivingTaskStatus.READY) {

// storage group has no data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.qp.utils.DatetimeUtils;
import org.apache.iotdb.db.qp.utils.DateTimeUtils;
import org.apache.iotdb.tsfile.utils.FilePathUtils;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;

Expand Down Expand Up @@ -76,7 +76,7 @@ public ArchivingTask(
this.targetDir = targetDir;
this.ttl = ttl;
this.startTime = startTime;
this.submitTime = DatetimeUtils.currentTime();
this.submitTime = DateTimeUtils.currentTime();
}

public ArchivingTask(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateContinuousQueryPlan;
import org.apache.iotdb.db.qp.physical.sys.DropContinuousQueryPlan;
import org.apache.iotdb.db.qp.utils.DatetimeUtils;
import org.apache.iotdb.db.qp.utils.DateTimeUtils;
import org.apache.iotdb.db.query.dataset.ShowContinuousQueriesResult;
import org.apache.iotdb.db.service.IService;
import org.apache.iotdb.db.service.ServiceType;
Expand All @@ -49,7 +49,7 @@ public class ContinuousQueryService implements IService {

private static final Logger LOGGER = LoggerFactory.getLogger(ContinuousQueryService.class);

private static final long SYSTEM_STARTUP_TIME = DatetimeUtils.currentTime();
private static final long SYSTEM_STARTUP_TIME = DateTimeUtils.currentTime();

private static final ContinuousQueryTaskPoolManager TASK_POOL_MANAGER =
ContinuousQueryTaskPoolManager.getInstance();
Expand Down Expand Up @@ -130,7 +130,7 @@ public void start() throws StartupException {
this::checkAndSubmitTasks,
0,
TASK_SUBMIT_CHECK_INTERVAL,
DatetimeUtils.timestampPrecisionStringToTimeUnit(
DateTimeUtils.timestampPrecisionStringToTimeUnit(
IoTDBDescriptor.getInstance().getConfig().getTimestampPrecision()));

LOGGER.info("Continuous query service started.");
Expand All @@ -157,7 +157,7 @@ private long calculateNextExecutionTimestamp(
}

private void checkAndSubmitTasks() {
long currentTimestamp = DatetimeUtils.currentTime();
long currentTimestamp = DateTimeUtils.currentTime();
for (CreateContinuousQueryPlan plan : continuousQueryPlans.values()) {
long nextExecutionTimestamp = nextExecutionTimestamps.get(plan.getContinuousQueryName());
while (currentTimestamp >= nextExecutionTimestamp) {
Expand Down Expand Up @@ -242,7 +242,7 @@ private void doRegister(CreateContinuousQueryPlan plan) {
continuousQueryPlans.put(plan.getContinuousQueryName(), plan);
nextExecutionTimestamps.put(
plan.getContinuousQueryName(),
calculateNextExecutionTimestamp(plan, DatetimeUtils.currentTime()));
calculateNextExecutionTimestamp(plan, DateTimeUtils.currentTime()));
}

@TestOnly
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.iotdb.db.engine.querycontext;

import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.qp.utils.DateTimeUtils;
import org.apache.iotdb.tsfile.read.filter.TimeFilter;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.filter.operator.AndFilter;
Expand Down Expand Up @@ -77,9 +78,9 @@ public void setDataTTL(long dataTTL) {
public Filter updateFilterUsingTTL(Filter filter) {
if (dataTTL != Long.MAX_VALUE) {
if (filter != null) {
filter = new AndFilter(filter, TimeFilter.gtEq(System.currentTimeMillis() - dataTTL));
filter = new AndFilter(filter, TimeFilter.gtEq(DateTimeUtils.currentTime() - dataTTL));
} else {
filter = TimeFilter.gtEq(System.currentTimeMillis() - dataTTL);
filter = TimeFilter.gtEq(DateTimeUtils.currentTime() - dataTTL);
}
}
return filter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.iotdb.db.engine.upgrade.UpgradeTask;
import org.apache.iotdb.db.exception.PartitionViolationException;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.qp.utils.DateTimeUtils;
import org.apache.iotdb.db.query.filter.TsFileFilter;
import org.apache.iotdb.db.service.UpgradeSevice;
import org.apache.iotdb.db.utils.TestOnly;
Expand Down Expand Up @@ -769,7 +770,7 @@ public boolean isSatisfied(

/** @return whether the given time falls in ttl */
private boolean isAlive(long time, long dataTTL) {
return dataTTL == Long.MAX_VALUE || (System.currentTimeMillis() - time) <= dataTTL;
return dataTTL == Long.MAX_VALUE || (DateTimeUtils.currentTime() - time) <= dataTTL;
}

public void setProcessor(TsFileProcessor processor) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertRowsOfOneDevicePlan;
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.qp.utils.DateTimeUtils;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.FileReaderManager;
import org.apache.iotdb.db.query.control.QueryFileManager;
Expand Down Expand Up @@ -876,7 +877,7 @@ public void insert(InsertRowPlan insertRowPlan)
throws WriteProcessException, TriggerExecutionException {
// reject insertions that are out of ttl
if (!isAlive(insertRowPlan.getTime())) {
throw new OutOfTTLException(insertRowPlan.getTime(), (System.currentTimeMillis() - dataTTL));
throw new OutOfTTLException(insertRowPlan.getTime(), (DateTimeUtils.currentTime() - dataTTL));
}
writeLock("InsertRow");
try {
Expand Down Expand Up @@ -1030,7 +1031,7 @@ public void insertTablet(InsertTabletPlan insertTabletPlan)

/** @return whether the given time falls in ttl */
private boolean isAlive(long time) {
return dataTTL == Long.MAX_VALUE || (System.currentTimeMillis() - time) <= dataTTL;
return dataTTL == Long.MAX_VALUE || (DateTimeUtils.currentTime() - time) <= dataTTL;
}

/**
Expand Down Expand Up @@ -1485,7 +1486,7 @@ public synchronized void checkFilesTTL() {
logicalStorageGroupName + "-" + virtualStorageGroupId);
return;
}
long ttlLowerBound = System.currentTimeMillis() - dataTTL;
long ttlLowerBound = DateTimeUtils.currentTime() - dataTTL;
logger.debug(
"{}: TTL removing files before {}",
logicalStorageGroupName + "-" + virtualStorageGroupId,
Expand Down Expand Up @@ -1559,11 +1560,13 @@ public void checkArchivingTask(ArchivingTask task) {
logicalStorageGroupName + "-" + virtualStorageGroupId);
return;
}
long ttlLowerBound = System.currentTimeMillis() - task.getTTL();

long ttlLowerBound =
DateTimeUtils.currentTime() - DateTimeUtils.convertMilliTimeWithPrecision(task.getTTL());
logger.debug(
"{}: Archiving files before {}",
logicalStorageGroupName + "-" + virtualStorageGroupId,
new Date(ttlLowerBound));
DateTimeUtils.convertMillsecondToZonedDateTime(ttlLowerBound));

// copy to avoid concurrent modification of deletion
List<TsFileResource> seqFiles = new ArrayList<>(tsFileManager.getTsFileList(true));
Expand Down Expand Up @@ -1864,7 +1867,7 @@ private List<TsFileResource> getFileResourceListForQuery(
List<TsFileResource> tsfileResourcesForQuery = new ArrayList<>();

long timeLowerBound =
dataTTL != Long.MAX_VALUE ? System.currentTimeMillis() - dataTTL : Long.MIN_VALUE;
dataTTL != Long.MAX_VALUE ? DateTimeUtils.currentTime() - dataTTL : Long.MIN_VALUE;
context.setQueryTimeLowerBound(timeLowerBound);

// for upgrade files and old files must be closed
Expand Down Expand Up @@ -2997,6 +3000,13 @@ public Collection<TsFileProcessor> getWorkUnsequenceTsFileProcessors() {
return workUnsequenceTsFileProcessors.values();
}

public void setDataTTLWithTimePrecisionCheck(long dataTTL) {
if (dataTTL != Long.MAX_VALUE) {
dataTTL = DateTimeUtils.convertMilliTimeWithPrecision(dataTTL);
}
this.dataTTL = dataTTL;
}

public void setDataTTL(long dataTTL) {
this.dataTTL = dataTTL;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,7 @@ public void setTTL(long dataTTL) {
for (VirtualStorageGroupProcessor virtualStorageGroupProcessor :
this.virtualStorageGroupProcessor) {
if (virtualStorageGroupProcessor != null) {
virtualStorageGroupProcessor.setDataTTL(dataTTL);
virtualStorageGroupProcessor.setDataTTLWithTimePrecisionCheck(dataTTL);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

package org.apache.iotdb.db.protocol.influxdb.input;

import org.apache.iotdb.db.qp.utils.DatetimeUtils;
import org.apache.iotdb.db.qp.utils.DateTimeUtils;

import org.antlr.v4.runtime.ANTLRInputStream;
import org.antlr.v4.runtime.CharStream;
Expand All @@ -41,7 +41,7 @@ public static List<Point> parserRecordsToPoints(String records) {
}

public static List<Point> parserRecordsToPointsWithPrecision(String records, String precision) {
return parserRecordsToPoints(records, DatetimeUtils.toTimeUnit(precision));
return parserRecordsToPoints(records, DateTimeUtils.toTimeUnit(precision));
}

public static List<Point> parserRecordsToPoints(String records, TimeUnit precision) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import org.apache.iotdb.db.qp.logical.crud.WhereComponent;
import org.apache.iotdb.db.qp.sql.InfluxDBSqlParser;
import org.apache.iotdb.db.qp.sql.InfluxDBSqlParserBaseVisitor;
import org.apache.iotdb.db.qp.utils.DatetimeUtils;
import org.apache.iotdb.db.qp.utils.DateTimeUtils;
import org.apache.iotdb.db.query.expression.Expression;
import org.apache.iotdb.db.query.expression.ResultColumn;
import org.apache.iotdb.db.query.expression.binary.AdditionExpression;
Expand Down Expand Up @@ -249,9 +249,9 @@ private static Long parseDateExpression(InfluxDBSqlParser.DateExpressionContext
time = parseTimeFormat(ctx.getChild(0).getText());
for (int i = 1; i < ctx.getChildCount(); i = i + 2) {
if (ctx.getChild(i).getText().equals("+")) {
time += DatetimeUtils.convertDurationStrToLong(time, ctx.getChild(i + 1).getText());
time += DateTimeUtils.convertDurationStrToLong(time, ctx.getChild(i + 1).getText());
} else {
time -= DatetimeUtils.convertDurationStrToLong(time, ctx.getChild(i + 1).getText());
time -= DateTimeUtils.convertDurationStrToLong(time, ctx.getChild(i + 1).getText());
}
}
return time;
Expand All @@ -263,7 +263,7 @@ public static long parseTimeFormat(String timestampStr) {
throw new IllegalArgumentException("input timestamp cannot be empty");
}
if (timestampStr.equalsIgnoreCase(SQLConstant.NOW_FUNC)) {
return DatetimeUtils.currentTime();
return DateTimeUtils.currentTime();
}
throw new IllegalArgumentException(
String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@
import org.apache.iotdb.db.qp.physical.sys.StartTriggerPlan;
import org.apache.iotdb.db.qp.physical.sys.StopTriggerPlan;
import org.apache.iotdb.db.qp.physical.sys.UnsetTemplatePlan;
import org.apache.iotdb.db.qp.utils.DatetimeUtils;
import org.apache.iotdb.db.qp.utils.DateTimeUtils;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.FileReaderManager;
import org.apache.iotdb.db.query.control.QueryResourceManager;
Expand Down Expand Up @@ -1336,13 +1336,13 @@ private QueryDataSet processShowArchiving(ShowArchivingPlan showArchivingPlan) {
ttl = null;
}
ZonedDateTime submitDateTime =
DatetimeUtils.convertMillsecondToZonedDateTime(task.getSubmitTime());
String submitTimeStr = DatetimeUtils.ISO_OFFSET_DATE_TIME_WITH_MS.format(submitDateTime);
DateTimeUtils.convertMillsecondToZonedDateTime(task.getSubmitTime());
String submitTimeStr = DateTimeUtils.ISO_OFFSET_DATE_TIME_WITH_MS.format(submitDateTime);
submitTime.setBinaryV(new Binary(submitTimeStr));
if (task.getStartTime() != Long.MAX_VALUE) {
ZonedDateTime startDate =
DatetimeUtils.convertMillsecondToZonedDateTime(task.getStartTime());
String startTimeStr = DatetimeUtils.ISO_OFFSET_DATE_TIME_WITH_MS.format(startDate);
DateTimeUtils.convertMillsecondToZonedDateTime(task.getStartTime());
String startTimeStr = DateTimeUtils.ISO_OFFSET_DATE_TIME_WITH_MS.format(startDate);
startTime = new Field(TSDataType.TEXT);
startTime.setBinaryV(new Binary(startTimeStr));
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.qp.logical.Operator;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.utils.DatetimeUtils;
import org.apache.iotdb.db.qp.utils.DateTimeUtils;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;

import java.io.DataOutputStream;
Expand Down Expand Up @@ -75,7 +75,7 @@ public CreateContinuousQueryPlan(
this.firstExecutionTimeBoundary =
firstExecutionTimeBoundary != null
? firstExecutionTimeBoundary
: DatetimeUtils.currentTime();
: DateTimeUtils.currentTime();
}

public String getQuerySql() {
Expand Down
Loading