Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,9 @@ public void testLoad() throws Exception {
generator.generateData(SchemaConfig.DEVICE_2, 10000, PARTITION_INTERVAL / 10_000, false);
generator.generateData(SchemaConfig.DEVICE_3, 10000, PARTITION_INTERVAL / 10_000, false);
generator.generateData(SchemaConfig.DEVICE_4, 10000, PARTITION_INTERVAL / 10_000, true);
for (int i = 0; i < 10000; i++) {
generator.generateData(SchemaConfig.DEVICE_4, 1, PARTITION_INTERVAL - 10, true);
}
writtenPoint2 = generator.getTotalNumber();
}

Expand Down Expand Up @@ -252,6 +255,86 @@ public void testLoad() throws Exception {
}
}

@Test
public void testLoadAcrossMultipleTimePartitions() throws Exception {
registerSchema();

final long writtenPoint1;
// device 0, device 1, sg 0
try (final TsFileGenerator generator =
new TsFileGenerator(new File(tmpDir, "1-0-0-0.tsfile"))) {
generator.registerTimeseries(
SchemaConfig.DEVICE_0,
Arrays.asList(
SchemaConfig.MEASUREMENT_00,
SchemaConfig.MEASUREMENT_01,
SchemaConfig.MEASUREMENT_02,
SchemaConfig.MEASUREMENT_03,
SchemaConfig.MEASUREMENT_04,
SchemaConfig.MEASUREMENT_05,
SchemaConfig.MEASUREMENT_06,
SchemaConfig.MEASUREMENT_07));
generator.registerAlignedTimeseries(
SchemaConfig.DEVICE_1,
Arrays.asList(
SchemaConfig.MEASUREMENT_10,
SchemaConfig.MEASUREMENT_11,
SchemaConfig.MEASUREMENT_12,
SchemaConfig.MEASUREMENT_13,
SchemaConfig.MEASUREMENT_14,
SchemaConfig.MEASUREMENT_15,
SchemaConfig.MEASUREMENT_16,
SchemaConfig.MEASUREMENT_17));
for (int i = 0; i < 1000; i++) {
generator.generateData(SchemaConfig.DEVICE_0, 1, PARTITION_INTERVAL - 10, false);
}
for (int i = 0; i < 1000; i++) {
generator.generateData(SchemaConfig.DEVICE_1, 1, PARTITION_INTERVAL - 10, true);
}
writtenPoint1 = generator.getTotalNumber();
}

final long writtenPoint2;
// device 2, device 3, device4, sg 1
try (final TsFileGenerator generator =
new TsFileGenerator(new File(tmpDir, "2-0-0-0.tsfile"))) {
generator.registerTimeseries(
SchemaConfig.DEVICE_2, Collections.singletonList(SchemaConfig.MEASUREMENT_20));
generator.registerTimeseries(
SchemaConfig.DEVICE_3, Collections.singletonList(SchemaConfig.MEASUREMENT_30));
generator.registerAlignedTimeseries(
SchemaConfig.DEVICE_4, Collections.singletonList(SchemaConfig.MEASUREMENT_40));
for (int i = 0; i < 1000; i++) {
generator.generateData(SchemaConfig.DEVICE_2, 1, PARTITION_INTERVAL - 10, false);
}
for (int i = 0; i < 1000; i++) {
generator.generateData(SchemaConfig.DEVICE_3, 1, PARTITION_INTERVAL - 10, false);
}
for (int i = 0; i < 1000; i++) {
generator.generateData(SchemaConfig.DEVICE_4, 1, PARTITION_INTERVAL - 10, true);
}
writtenPoint2 = generator.getTotalNumber();
}

try (final Connection connection = EnvFactory.getEnv().getConnection();
final Statement statement = connection.createStatement()) {

statement.execute(String.format("load \"%s\" sglevel=2", tmpDir.getAbsolutePath()));

try (final ResultSet resultSet =
statement.executeQuery("select count(*) from root.sg.** group by level=1,2")) {
if (resultSet.next()) {
long sg1Count = resultSet.getLong("count(root.sg.test_0.*.*)");
Assert.assertEquals(writtenPoint1, sg1Count);
long sg2Count = resultSet.getLong("count(root.sg.test_1.*.*)");
Assert.assertEquals(writtenPoint2, sg2Count);
} else {
Assert.fail("This ResultSet is empty.");
}
}
}
}

@Test
public void testLoadWithExtendTemplate() throws Exception {
final long writtenPoint1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1166,6 +1166,8 @@ public class IoTDBConfig {

private boolean loadActiveListeningEnable = true;

private long loadMeasurementIdCacheSizeInBytes = 2 * 1024 * 1024L; // 2MB

private String[] loadActiveListeningDirs =
new String[] {
IoTDBConstant.EXT_FOLDER_NAME
Expand Down Expand Up @@ -4185,6 +4187,14 @@ public void setLoadActiveListeningEnable(boolean loadActiveListeningEnable) {
this.loadActiveListeningEnable = loadActiveListeningEnable;
}

public long getLoadMeasurementIdCacheSizeInBytes() {
return loadMeasurementIdCacheSizeInBytes;
}

public void setLoadMeasurementIdCacheSizeInBytes(long loadMeasurementIdCacheSizeInBytes) {
this.loadMeasurementIdCacheSizeInBytes = loadMeasurementIdCacheSizeInBytes;
}

public void setPipeReceiverFileDirs(String[] pipeReceiverFileDirs) {
this.pipeReceiverFileDirs = pipeReceiverFileDirs;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2473,6 +2473,12 @@ private void loadLoadTsFileProps(TrimProperties properties) throws IOException {
? conf.getLoadActiveListeningCheckIntervalSeconds()
: loadActiveListeningCheckIntervalSeconds);

conf.setLoadMeasurementIdCacheSizeInBytes(
Long.parseLong(
properties.getProperty(
"load_measurement_id_cache_size_in_bytes",
Long.toString(conf.getLoadMeasurementIdCacheSizeInBytes()))));

conf.setLoadActiveListeningMaxThreadNum(
Integer.parseInt(
properties.getProperty(
Expand Down Expand Up @@ -2537,6 +2543,7 @@ private void loadLoadTsFileHotModifiedProp(TrimProperties properties) throws IOE
"load_active_listening_enable",
ConfigurationFileUtils.getConfigurationDefaultValue(
"load_active_listening_enable"))));

conf.setLoadActiveListeningDirs(
Arrays.stream(
properties
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@
import org.apache.iotdb.db.storageengine.rescon.disk.strategy.DirectoryStrategyType;
import org.apache.iotdb.metrics.utils.MetricLevel;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import org.apache.tsfile.common.constant.TsFileConstant;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.exception.write.PageException;
Expand All @@ -74,10 +76,12 @@
import java.nio.file.Path;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -105,6 +109,12 @@ public class LoadTsFileManager {
new AtomicReference<>(CONFIG.getLoadTsFileDirs());
private static final AtomicReference<FolderManager> FOLDER_MANAGER = new AtomicReference<>();

public static final Cache<String, String> MEASUREMENT_ID_CACHE =
Caffeine.newBuilder()
.maximumWeight(CONFIG.getLoadMeasurementIdCacheSizeInBytes())
.weigher((String k, String v) -> v.length())
.build();

private final Map<String, TsFileWriterManager> uuid2WriterManager = new ConcurrentHashMap<>();

private final Map<String, CleanupTask> uuid2CleanupTask = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -376,6 +386,7 @@ private static class TsFileWriterManager {
private Map<DataPartitionInfo, TsFileResource> dataPartition2Resource;
private Map<DataPartitionInfo, String> dataPartition2LastDevice;
private Map<DataPartitionInfo, ModificationFile> dataPartition2ModificationFile;
private Map<String, Set<DataPartitionInfo>> device2Partition;
private boolean isClosed;

private TsFileWriterManager(File taskDir) {
Expand All @@ -384,6 +395,7 @@ private TsFileWriterManager(File taskDir) {
this.dataPartition2Resource = new HashMap<>();
this.dataPartition2LastDevice = new HashMap<>();
this.dataPartition2ModificationFile = new HashMap<>();
device2Partition = new HashMap<>();
this.isClosed = false;

clearDir(taskDir);
Expand Down Expand Up @@ -446,14 +458,36 @@ private void write(DataPartitionInfo partitionInfo, ChunkData chunkData)
dataPartition2Resource.put(partitionInfo, resource);
}
TsFileIOWriter writer = dataPartition2Writer.get(partitionInfo);
if (!chunkData.getDevice().equals(dataPartition2LastDevice.getOrDefault(partitionInfo, ""))) {
if (dataPartition2LastDevice.containsKey(partitionInfo)) {
writer.endChunkGroup();
writer.checkMetadataSizeAndMayFlush();

String device = chunkData.getDevice();
String lastDevice = dataPartition2LastDevice.get(partitionInfo);

if (!Objects.equals(device, lastDevice)) {
if (lastDevice != null && device2Partition.containsKey(lastDevice)) {
Set<DataPartitionInfo> partitions = device2Partition.get(lastDevice);
for (DataPartitionInfo partition : partitions) {
TsFileIOWriter w = dataPartition2Writer.get(partition);
if (dataPartition2LastDevice.containsKey(partition) && w != null) {
w.endChunkGroup();
w.checkMetadataSizeAndMayFlush();
}
}
device2Partition.remove(lastDevice);
}
writer.startChunkGroup(new PlainDeviceID(chunkData.getDevice()));
dataPartition2LastDevice.put(partitionInfo, chunkData.getDevice());

if (writer.isWritingChunkGroup()) {
LOGGER.warn(
"Writer {} for partition {} is already writing chunk group for device {}, but the last device is {}. ",
writer.getFile().getAbsolutePath(),
partitionInfo,
device,
lastDevice);
}
writer.startChunkGroup(new PlainDeviceID(device));
dataPartition2LastDevice.put(partitionInfo, device);
device2Partition.computeIfAbsent(device, k -> new HashSet<>()).add(partitionInfo);
}

chunkData.writeToFileWriter(writer);
}

Expand Down Expand Up @@ -672,8 +706,10 @@ private void close() {
LOGGER.warn(MESSAGE_DELETE_FAIL, taskDir.getPath(), e);
}
dataPartition2Writer = null;
dataPartition2Resource = null;
dataPartition2LastDevice = null;
dataPartition2ModificationFile = null;
device2Partition = null;
isClosed = true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@
import java.util.List;
import java.util.Queue;

import static org.apache.iotdb.db.storageengine.load.LoadTsFileManager.MEASUREMENT_ID_CACHE;

public class AlignedChunkData implements ChunkData {
protected static final int DEFAULT_INT32 = 0;
protected static final long DEFAULT_INT64 = 0L;
Expand Down Expand Up @@ -447,7 +449,10 @@ public static AlignedChunkData deserialize(final InputStream stream)
final List<ChunkHeader> chunkHeaderList = new ArrayList<>();
for (int i = 0; i < chunkHeaderListSize; i++) {
final byte chunkType = ReadWriteIOUtils.readByte(stream);
chunkHeaderList.add(ChunkHeader.deserializeFrom(stream, chunkType));
ChunkHeader chunkHeader = ChunkHeader.deserializeFrom(stream, chunkType);
String measurementID = chunkHeader.getMeasurementID();
chunkHeader.setMeasurementID(MEASUREMENT_ID_CACHE.get(measurementID, m -> m));
chunkHeaderList.add(chunkHeader);
}
final List<Integer> pageNumbers = new ArrayList<>();
if (needDecodeChunk) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
import java.io.Serializable;
import java.nio.ByteBuffer;

import static org.apache.iotdb.db.storageengine.load.LoadTsFileManager.MEASUREMENT_ID_CACHE;

public class NonAlignedChunkData implements ChunkData {

private final TTimePartitionSlot timePartitionSlot;
Expand Down Expand Up @@ -284,6 +286,8 @@ public static NonAlignedChunkData deserialize(final InputStream stream)
final boolean needDecodeChunk = ReadWriteIOUtils.readBool(stream);
final byte chunkType = ReadWriteIOUtils.readByte(stream);
final ChunkHeader chunkHeader = ChunkHeader.deserializeFrom(stream, chunkType);
String measurementID = chunkHeader.getMeasurementID();
chunkHeader.setMeasurementID(MEASUREMENT_ID_CACHE.get(measurementID, m -> m));
int pageNumber = 0;
if (needDecodeChunk) {
pageNumber = ReadWriteIOUtils.readInt(stream);
Expand Down
Loading