Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,19 @@
import org.apache.tsfile.common.conf.TSFileConfig;
import org.apache.tsfile.common.constant.TsFileConstant;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.file.metadata.PlainDeviceID;
import org.apache.tsfile.file.metadata.enums.CompressionType;
import org.apache.tsfile.file.metadata.enums.TSEncoding;
import org.apache.tsfile.read.common.Field;
import org.apache.tsfile.read.common.Path;
import org.apache.tsfile.read.common.RowRecord;
import org.apache.tsfile.utils.Binary;
import org.apache.tsfile.utils.BitMap;
import org.apache.tsfile.write.TsFileWriter;
import org.apache.tsfile.write.record.TSRecord;
import org.apache.tsfile.write.record.Tablet;
import org.apache.tsfile.write.record.datapoint.IntDataPoint;
import org.apache.tsfile.write.schema.MeasurementSchema;
import org.junit.After;
import org.junit.AfterClass;
Expand All @@ -55,6 +61,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -73,6 +80,7 @@
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

@SuppressWarnings({"ThrowFromFinallyBlock", "ResultOfMethodCallIgnored"})
@RunWith(IoTDBTestRunner.class)
public class IoTDBSessionSimpleIT {

Expand Down Expand Up @@ -337,7 +345,7 @@ public void insertByObjAndNotInferTypeTest() {
expected.add(TSDataType.TEXT.name());

Set<String> actual = new HashSet<>();
SessionDataSet dataSet = session.executeQueryStatement("show timeseries root.**");
SessionDataSet dataSet = session.executeQueryStatement("show timeseries root.sg1.**");
while (dataSet.hasNext()) {
actual.add(dataSet.next().getFields().get(3).getStringValue());
}
Expand Down Expand Up @@ -781,7 +789,7 @@ public void createWrongTimeSeriesTest() {
LOGGER.error("", e);
}

final SessionDataSet dataSet = session.executeQueryStatement("SHOW TIMESERIES");
final SessionDataSet dataSet = session.executeQueryStatement("SHOW TIMESERIES root.sg.**");
assertFalse(dataSet.hasNext());

session.deleteStorageGroup(storageGroup);
Expand Down Expand Up @@ -1891,4 +1899,159 @@ public void convertRecordsToTabletsTest() {
e.printStackTrace();
}
}

@Test
@Category({LocalStandaloneIT.class, ClusterIT.class})
public void insertMinMaxTimeTest() throws IoTDBConnectionException, StatementExecutionException {
try {
try (ISession session = EnvFactory.getEnv().getSessionConnection()) {
try {
session.executeNonQueryStatement(
"SET CONFIGURATION \"timestamp_precision_check_enabled\"=\"false\"");
} catch (StatementExecutionException e) {
// run in IDE will trigger this, ignore it
if (!e.getMessage().contains("Unable to find the configuration file")) {
throw e;
}
}

session.executeNonQueryStatement(
String.format(
"INSERT INTO root.testInsertMinMax.d1(timestamp, s1) VALUES (%d, 1)",
Long.MIN_VALUE));
session.executeNonQueryStatement(
String.format(
"INSERT INTO root.testInsertMinMax.d1(timestamp, s1) VALUES (%d, 1)",
Long.MAX_VALUE));

SessionDataSet dataSet =
session.executeQueryStatement("SELECT * FROM root.testInsertMinMax.d1");
RowRecord record = dataSet.next();
assertEquals(Long.MIN_VALUE, record.getTimestamp());
record = dataSet.next();
assertEquals(Long.MAX_VALUE, record.getTimestamp());
assertFalse(dataSet.hasNext());

session.executeNonQueryStatement("FLUSH");
dataSet = session.executeQueryStatement("SELECT * FROM root.testInsertMinMax.d1");
record = dataSet.next();
assertEquals(Long.MIN_VALUE, record.getTimestamp());
record = dataSet.next();
assertEquals(Long.MAX_VALUE, record.getTimestamp());
assertFalse(dataSet.hasNext());
}
} finally {
try (ISession session = EnvFactory.getEnv().getSessionConnection()) {
try {
session.executeNonQueryStatement(
"SET CONFIGURATION \"timestamp_precision_check_enabled\"=\"true\"");
} catch (StatementExecutionException e) {
// run in IDE will trigger this, ignore it
if (!e.getMessage().contains("Unable to find the configuration file")) {
throw e;
}
}
}
}
}

@Test
@Category({LocalStandaloneIT.class, ClusterIT.class})
public void loadMinMaxTimeNonAlignedTest() throws Exception {
File file = new File("target", "test.tsfile");
try (TsFileWriter writer = new TsFileWriter(file)) {
IDeviceID deviceID = new PlainDeviceID("root.testLoadMinMax.d1");
writer.registerTimeseries(new Path(deviceID), new MeasurementSchema("s1", TSDataType.INT32));
TSRecord record = new TSRecord(Long.MIN_VALUE, deviceID);
record.addTuple(new IntDataPoint("s1", 1));
writer.write(record);
record.setTime(Long.MAX_VALUE);
writer.write(record);
}

try (ISession session = EnvFactory.getEnv().getSessionConnection()) {
try {
session.executeNonQueryStatement(
"SET CONFIGURATION \"timestamp_precision_check_enabled\"=\"false\"");
} catch (StatementExecutionException e) {
// run in IDE will trigger this, ignore it
if (!e.getMessage().contains("Unable to find the configuration file")) {
throw e;
}
}
session.executeNonQueryStatement("LOAD \"" + file.getAbsolutePath() + "\"");

SessionDataSet dataSet =
session.executeQueryStatement("SELECT * FROM root.testLoadMinMax.d1");
RowRecord record = dataSet.next();
assertEquals(Long.MIN_VALUE, record.getTimestamp());
record = dataSet.next();
assertEquals(Long.MAX_VALUE, record.getTimestamp());
assertFalse(dataSet.hasNext());
} finally {
try (ISession session = EnvFactory.getEnv().getSessionConnection()) {
try {
session.executeNonQueryStatement(
"SET CONFIGURATION \"timestamp_precision_check_enabled\"=\"true\"");
} catch (StatementExecutionException e) {
// run in IDE will trigger this, ignore it
if (!e.getMessage().contains("Unable to find the configuration file")) {
throw e;
}
}
}
file.delete();
}
}

@Test
@Category({LocalStandaloneIT.class, ClusterIT.class})
public void loadMinMaxTimeAlignedTest() throws Exception {
File file = new File("target", "test.tsfile");
try (TsFileWriter writer = new TsFileWriter(file)) {
IDeviceID deviceID = new PlainDeviceID("root.testLoadMinMaxAligned.d1");
writer.registerAlignedTimeseries(
new Path(deviceID),
Collections.singletonList(new MeasurementSchema("s1", TSDataType.INT32)));
TSRecord record = new TSRecord(Long.MIN_VALUE, deviceID);
record.addTuple(new IntDataPoint("s1", 1));
writer.writeAligned(record);
record.setTime(Long.MAX_VALUE);
writer.writeAligned(record);
}

try (ISession session = EnvFactory.getEnv().getSessionConnection()) {
try {
session.executeNonQueryStatement(
"SET CONFIGURATION \"timestamp_precision_check_enabled\"=\"false\"");
} catch (StatementExecutionException e) {
// run in IDE will trigger this, ignore it
if (!e.getMessage().contains("Unable to find the configuration file")) {
throw e;
}
}
session.executeNonQueryStatement("LOAD \"" + file.getAbsolutePath() + "\"");

SessionDataSet dataSet =
session.executeQueryStatement("SELECT * FROM root.testLoadMinMaxAligned.d1");
RowRecord record = dataSet.next();
assertEquals(Long.MIN_VALUE, record.getTimestamp());
record = dataSet.next();
assertEquals(Long.MAX_VALUE, record.getTimestamp());
assertFalse(dataSet.hasNext());
} finally {
try (ISession session = EnvFactory.getEnv().getSessionConnection()) {
try {
session.executeNonQueryStatement(
"SET CONFIGURATION \"timestamp_precision_check_enabled\"=\"true\"");
} catch (StatementExecutionException e) {
// run in IDE will trigger this, ignore it
if (!e.getMessage().contains("Unable to find the configuration file")) {
throw e;
}
}
}
file.delete();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2017,6 +2017,15 @@ public synchronized void loadHotModifiedProps(TrimProperties properties)
BinaryAllocator.getInstance().close(true);
}

commonDescriptor
.getConfig()
.setTimestampPrecisionCheckEnabled(
Boolean.parseBoolean(
properties.getProperty(
"timestamp_precision_check_enabled",
ConfigurationFileUtils.getConfigurationDefaultValue(
"timestamp_precision_check_enabled"))));

// update query_sample_throughput_bytes_per_sec
loadQuerySampleThroughput(properties);
// update trusted_uri_pattern
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2280,7 +2280,11 @@ public static Pair<List<TTimePartitionSlot>, Pair<Boolean, Boolean>> getTimePart
result.add(timePartitionSlot);
// next init
timePartitionSlot = new TTimePartitionSlot(endTime);
endTime = endTime + TimePartitionUtils.getTimePartitionInterval();
// beware of overflow
endTime =
endTime + TimePartitionUtils.getTimePartitionInterval() > endTime
? endTime + TimePartitionUtils.getTimePartitionInterval()
: Long.MAX_VALUE;
} else {
index++;
if (index < size) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -840,12 +840,8 @@ public void setCurrentModificationsAndTimeIndex(TsFileResource resource) throws
}

public boolean isDeviceDeletedByMods(IDeviceID device) throws IllegalPathException {
return currentTimeIndex != null
&& ModificationUtils.isDeviceDeletedByMods(
currentModifications,
device,
currentTimeIndex.getStartTime(device),
currentTimeIndex.getEndTime(device));
return ModificationUtils.isDeviceDeletedByMods(
currentModifications, currentTimeIndex, device);
}

public boolean isTimeseriesDeletedByMods(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2006,15 +2006,18 @@ private long parseTimeValue(ConstantContext constant) {
if (constant.INTEGER_LITERAL() != null) {
try {
if (constant.MINUS() != null) {
return -Long.parseLong(constant.INTEGER_LITERAL().getText());
return Long.parseLong("-" + constant.INTEGER_LITERAL().getText());
}
return Long.parseLong(constant.INTEGER_LITERAL().getText());
} catch (NumberFormatException e) {
throw new SemanticException(
String.format(
"Current system timestamp precision is %s, "
"Failed to parse the timestamp: "
+ e.getMessage()
+ "Current system timestamp precision is %s, "
+ "please check whether the timestamp %s is correct.",
TIMESTAMP_PRECISION, constant.INTEGER_LITERAL().getText()));
TIMESTAMP_PRECISION,
constant.INTEGER_LITERAL().getText()));
}
} else if (constant.dateExpression() != null) {
return parseDateExpression(constant.dateExpression(), CommonDateTimeUtils.currentTime());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,18 +75,22 @@ public boolean isTsFileEmpty() {
return resource.getDevices().isEmpty();
}

@SuppressWarnings("OptionalGetWithoutIsPresent")
public boolean needDecodeTsFile(
Function<List<Pair<IDeviceID, TTimePartitionSlot>>, List<TRegionReplicaSet>> partitionFetcher)
throws IOException {
Function<List<Pair<IDeviceID, TTimePartitionSlot>>, List<TRegionReplicaSet>>
partitionFetcher) {
List<Pair<IDeviceID, TTimePartitionSlot>> slotList = new ArrayList<>();
resource
.getDevices()
.forEach(
o -> {
// iterating the index, must present
slotList.add(
new Pair<>(o, TimePartitionUtils.getTimePartitionSlot(resource.getStartTime(o))));
new Pair<>(
o, TimePartitionUtils.getTimePartitionSlot(resource.getStartTime(o).get())));
slotList.add(
new Pair<>(o, TimePartitionUtils.getTimePartitionSlot(resource.getEndTime(o))));
new Pair<>(
o, TimePartitionUtils.getTimePartitionSlot(resource.getEndTime(o).get())));
});

if (slotList.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -654,7 +654,8 @@ protected void updateDeviceLastFlushTime(TsFileResource resource) {
long timePartitionId = resource.getTimePartition();
Map<IDeviceID, Long> endTimeMap = new HashMap<>();
for (IDeviceID deviceId : resource.getDevices()) {
long endTime = resource.getEndTime(deviceId);
@SuppressWarnings("OptionalGetWithoutIsPresent") // checked above
long endTime = resource.getEndTime(deviceId).get();
endTimeMap.put(deviceId, endTime);
}
if (config.isEnableSeparateData()) {
Expand All @@ -670,7 +671,9 @@ protected void upgradeAndUpdateDeviceLastFlushTime(
Map<IDeviceID, Long> endTimeMap = new HashMap<>();
for (TsFileResource resource : resources) {
for (IDeviceID deviceId : resource.getDevices()) {
long endTime = resource.getEndTime(deviceId);
// checked above
//noinspection OptionalGetWithoutIsPresent
long endTime = resource.getEndTime(deviceId).get();
endTimeMap.put(deviceId, endTime);
}
}
Expand Down Expand Up @@ -2326,6 +2329,7 @@ public void insertSeparatorToWAL() {
}
}

@SuppressWarnings("OptionalGetWithoutIsPresent")
private boolean canSkipDelete(
TsFileResource tsFileResource,
Set<PartialPath> devicePaths,
Expand Down Expand Up @@ -2369,8 +2373,8 @@ private boolean canSkipDelete(
// resource does not contain this device
continue;
}
deviceStartTime = tsFileResource.getStartTime(deviceId);
deviceEndTime = tsFileResource.getEndTime(deviceId);
deviceStartTime = tsFileResource.getStartTime(deviceId).get();
deviceEndTime = tsFileResource.getEndTime(deviceId).get();
}

if (!tsFileResource.isClosed() && deviceEndTime == Long.MIN_VALUE) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,9 @@ public void perform() throws Exception {
long ttl = deviceIterator.getTTLForCurrentDevice();
sortedSourceFiles.removeIf(
x -> x.definitelyNotContains(device) || !x.isDeviceAlive(device, ttl));
sortedSourceFiles.sort(Comparator.comparingLong(x -> x.getStartTime(device)));
// checked above
//noinspection OptionalGetWithoutIsPresent
sortedSourceFiles.sort(Comparator.comparingLong(x -> x.getStartTime(device).get()));
if (ttl != Long.MAX_VALUE) {
Deletion ttlDeletion =
new Deletion(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -214,7 +215,10 @@ public Pair<IDeviceID, Boolean> nextDevice() throws IllegalPathException {
ttlForCurrentDevice =
DataNodeTTLCache.getInstance()
.getTTL(((PlainDeviceID) currentDevice.getLeft()).toStringID());
timeLowerBoundForCurrentDevice = CommonDateTimeUtils.currentTime() - ttlForCurrentDevice;
timeLowerBoundForCurrentDevice =
ttlForCurrentDevice == Long.MAX_VALUE
? Long.MIN_VALUE
: CommonDateTimeUtils.currentTime() - ttlForCurrentDevice;
return currentDevice;
}

Expand Down Expand Up @@ -418,7 +422,8 @@ private void applyModificationForAlignedChunkMetadataList(
}
IDeviceID device = currentDevice.getLeft();
Deletion ttlDeletion = null;
if (tsFileResource.getStartTime(device) < timeLowerBoundForCurrentDevice) {
Optional<Long> startTime = tsFileResource.getStartTime(device);
if (startTime.isPresent() && startTime.get() < timeLowerBoundForCurrentDevice) {
ttlDeletion =
new Deletion(
CompactionPathUtils.getPath(device, IoTDBConstant.ONE_LEVEL_PATH_WILDCARD),
Expand Down Expand Up @@ -641,7 +646,8 @@ public String nextSeries() throws IllegalPathException {
Map<String, List<ChunkMetadata>> chunkMetadataListMap = chunkMetadataCacheMap.get(reader);

Deletion ttlDeletion = null;
if (resource.getStartTime(device) < timeLowerBoundForCurrentDevice) {
Optional<Long> startTime = resource.getStartTime(device);
if (startTime.isPresent() && startTime.get() < timeLowerBoundForCurrentDevice) {
ttlDeletion =
new Deletion(
CompactionPathUtils.getPath(device, IoTDBConstant.ONE_LEVEL_PATH_WILDCARD),
Expand Down
Loading
Loading