diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/IOTDBLoadTsFileIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/IOTDBLoadTsFileIT.java index 3334155273158..d72e58eed71ab 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/IOTDBLoadTsFileIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/IOTDBLoadTsFileIT.java @@ -26,6 +26,7 @@ import org.apache.iotdb.itbase.category.LocalStandaloneIT; import org.apache.iotdb.tsfile.exception.write.WriteProcessException; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; import org.apache.iotdb.tsfile.read.common.Path; import org.apache.iotdb.tsfile.utils.Binary; import org.apache.iotdb.tsfile.write.TsFileWriter; @@ -72,8 +73,6 @@ public void setUp() throws Exception { originPartitionInterval = ConfigFactory.getConfig().getPartitionInterval(); ConfigFactory.getConfig().setPartitionInterval(PARTITION_INTERVAL); EnvFactory.getEnv().initBeforeTest(); - - registerSchema(); } @After @@ -159,6 +158,8 @@ private boolean deleteDir() { @Test public void testLoad() throws Exception { + registerSchema(); + long writtenPoint1 = 0; // device 0, device 1, sg 0 try (TsFileGenerator generator = new TsFileGenerator(new File(tmpDir, "1-0-0-0.tsfile"))) { @@ -199,7 +200,66 @@ public void testLoad() throws Exception { try (Connection connection = EnvFactory.getEnv().getConnection(); Statement statement = connection.createStatement()) { - statement.execute(String.format("load \"%s\"", tmpDir.getAbsolutePath())); + statement.execute(String.format("load \"%s\" sglevel=2", tmpDir.getAbsolutePath())); + + try (ResultSet resultSet = + statement.executeQuery("select count(*) from root.** group by level=1,2")) { + if (resultSet.next()) { + long sg1Count = resultSet.getLong("count(root.sg.test_0.*.*)"); + Assert.assertEquals(writtenPoint1, sg1Count); + long sg2Count = resultSet.getLong("count(root.sg.test_1.*.*)"); + Assert.assertEquals(writtenPoint2, sg2Count); + } else { + Assert.fail("This ResultSet is empty."); + } + } + } + } + + @Test + public void testLoadWithAutoRegister() throws Exception { + long writtenPoint1 = 0; + // device 0, device 1, sg 0 + try (TsFileGenerator generator = new TsFileGenerator(new File(tmpDir, "1-0-0-0.tsfile"))) { + generator.registerTimeseries( + new Path(SchemaConfig.DEVICE_0), + Arrays.asList( + SchemaConfig.MEASUREMENT_00, + SchemaConfig.MEASUREMENT_01, + SchemaConfig.MEASUREMENT_02, + SchemaConfig.MEASUREMENT_03)); + generator.registerAlignedTimeseries( + new Path(SchemaConfig.DEVICE_1), + Arrays.asList( + SchemaConfig.MEASUREMENT_10, + SchemaConfig.MEASUREMENT_11, + SchemaConfig.MEASUREMENT_12, + SchemaConfig.MEASUREMENT_13)); + generator.generateData(new Path(SchemaConfig.DEVICE_0), 10000, false); + generator.generateData(new Path(SchemaConfig.DEVICE_1), 10000, true); + writtenPoint1 = generator.getTotalNumber(); + } + + long writtenPoint2 = 0; + // device 2, device 3, device4, sg 1 + try (TsFileGenerator generator = new TsFileGenerator(new File(tmpDir, "2-0-0-0.tsfile"))) { + generator.registerTimeseries( + new Path(SchemaConfig.DEVICE_2), Arrays.asList(SchemaConfig.MEASUREMENT_20)); + generator.registerTimeseries( + new Path(SchemaConfig.DEVICE_3), Arrays.asList(SchemaConfig.MEASUREMENT_30)); + generator.registerAlignedTimeseries( + new Path(SchemaConfig.DEVICE_4), Arrays.asList(SchemaConfig.MEASUREMENT_40)); + generator.generateData(new Path(SchemaConfig.DEVICE_2), 10000, false); + generator.generateData(new Path(SchemaConfig.DEVICE_3), 10000, false); + generator.generateData(new Path(SchemaConfig.DEVICE_4), 10000, true); + writtenPoint2 = generator.getTotalNumber(); + } + + try (Connection connection = EnvFactory.getEnv().getConnection(); + Statement statement = connection.createStatement()) { + + statement.execute( + String.format("load \"%s\" sglevel=2,autoregister=true", tmpDir.getAbsolutePath())); try (ResultSet resultSet = statement.executeQuery("select count(*) from root.** group by level=1,2")) { @@ -212,6 +272,25 @@ public void testLoad() throws Exception { Assert.fail("This ResultSet is empty."); } } + + Map isAligned = new HashMap<>(); + isAligned.put(SchemaConfig.DEVICE_0, "false"); + isAligned.put(SchemaConfig.DEVICE_1, "true"); + isAligned.put(SchemaConfig.DEVICE_2, "false"); + isAligned.put(SchemaConfig.DEVICE_3, "false"); + isAligned.put(SchemaConfig.DEVICE_4, "true"); + try (ResultSet resultSet = statement.executeQuery("show devices")) { + int size = 0; + while (resultSet.next()) { + size += 1; + String device = resultSet.getString("devices"); + Assert.assertEquals(isAligned.get(device), resultSet.getString("isAligned")); + } + Assert.assertEquals(isAligned.size(), size); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail("Parse result set error."); + } } } @@ -222,39 +301,39 @@ private static class SchemaConfig { // device 0, nonaligned, sg 0 private static final String DEVICE_0 = "root.sg.test_0.d_0"; private static final MeasurementSchema MEASUREMENT_00 = - new MeasurementSchema("sensor_00", TSDataType.INT32); + new MeasurementSchema("sensor_00", TSDataType.INT32, TSEncoding.RLE); private static final MeasurementSchema MEASUREMENT_01 = - new MeasurementSchema("sensor_01", TSDataType.INT64); + new MeasurementSchema("sensor_01", TSDataType.INT64, TSEncoding.RLE); private static final MeasurementSchema MEASUREMENT_02 = - new MeasurementSchema("sensor_02", TSDataType.DOUBLE); + new MeasurementSchema("sensor_02", TSDataType.DOUBLE, TSEncoding.GORILLA); private static final MeasurementSchema MEASUREMENT_03 = - new MeasurementSchema("sensor_03", TSDataType.TEXT); + new MeasurementSchema("sensor_03", TSDataType.TEXT, TSEncoding.PLAIN); // device 1, aligned, sg 0 private static final String DEVICE_1 = "root.sg.test_0.a_1"; private static final MeasurementSchema MEASUREMENT_10 = - new MeasurementSchema("sensor_10", TSDataType.INT32); + new MeasurementSchema("sensor_10", TSDataType.INT32, TSEncoding.RLE); private static final MeasurementSchema MEASUREMENT_11 = - new MeasurementSchema("sensor_11", TSDataType.INT64); + new MeasurementSchema("sensor_11", TSDataType.INT64, TSEncoding.RLE); private static final MeasurementSchema MEASUREMENT_12 = - new MeasurementSchema("sensor_12", TSDataType.DOUBLE); + new MeasurementSchema("sensor_12", TSDataType.DOUBLE, TSEncoding.GORILLA); private static final MeasurementSchema MEASUREMENT_13 = - new MeasurementSchema("sensor_13", TSDataType.TEXT); + new MeasurementSchema("sensor_13", TSDataType.TEXT, TSEncoding.PLAIN); // device 2, non aligned, sg 1 private static final String DEVICE_2 = "root.sg.test_1.d_2"; private static final MeasurementSchema MEASUREMENT_20 = - new MeasurementSchema("sensor_20", TSDataType.INT32); + new MeasurementSchema("sensor_20", TSDataType.INT32, TSEncoding.RLE); // device 3, non aligned, sg 1 private static final String DEVICE_3 = "root.sg.test_1.d_3"; private static final MeasurementSchema MEASUREMENT_30 = - new MeasurementSchema("sensor_30", TSDataType.INT32); + new MeasurementSchema("sensor_30", TSDataType.INT32, TSEncoding.RLE); // device 4, aligned, sg 1 private static final String DEVICE_4 = "root.sg.test_1.a_4"; private static final MeasurementSchema MEASUREMENT_40 = - new MeasurementSchema("sensor_40", TSDataType.INT32); + new MeasurementSchema("sensor_40", TSDataType.INT32, TSEncoding.RLE); } public class TsFileGenerator implements AutoCloseable { diff --git a/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java b/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java index 087d329380ef7..45214bb4353e4 100644 --- a/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java +++ b/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java @@ -1955,6 +1955,8 @@ public DeviceSchemaInfo getDeviceSchemaInfoWithAutoCreate( PartialPath devicePath, String[] measurements, Function getDataType, + TSEncoding[] encodings, + CompressionType[] compressionTypes, boolean aligned) throws MetadataException { throw new UnsupportedOperationException(); diff --git a/server/src/main/java/org/apache/iotdb/db/exception/VerifyMetadataException.java b/server/src/main/java/org/apache/iotdb/db/exception/VerifyMetadataException.java new file mode 100644 index 0000000000000..1603c9872f4cb --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/exception/VerifyMetadataException.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.exception; + +import org.apache.iotdb.commons.exception.IoTDBException; +import org.apache.iotdb.rpc.TSStatusCode; + +public class VerifyMetadataException extends IoTDBException { + public VerifyMetadataException( + String path, String compareInfo, String tsFileInfo, String tsFilePath, String IoTDBInfo) { + super( + String.format( + "%s %s mismatch, %s in tsfile %s, but %s in IoTDB.", + path, compareInfo, tsFileInfo, tsFilePath, IoTDBInfo), + TSStatusCode.VERIFY_METADATA_ERROR.getStatusCode()); + } + + public VerifyMetadataException(String message) { + super(message, TSStatusCode.VERIFY_METADATA_ERROR.getStatusCode()); + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/ISchemaRegion.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/ISchemaRegion.java index 55a5ecbe8f262..dec6efc6467f5 100644 --- a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/ISchemaRegion.java +++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/ISchemaRegion.java @@ -44,7 +44,9 @@ import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.query.dataset.ShowDevicesResult; import org.apache.iotdb.db.query.dataset.ShowTimeSeriesResult; +import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; import org.apache.iotdb.tsfile.utils.Pair; import java.io.File; @@ -393,6 +395,8 @@ DeviceSchemaInfo getDeviceSchemaInfoWithAutoCreate( PartialPath devicePath, String[] measurements, Function getDataType, + TSEncoding[] encodings, + CompressionType[] compressionTypes, boolean aligned) throws MetadataException; // endregion diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java index 28cc5feeeaa85..97a77bb97fc81 100644 --- a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java @@ -1869,6 +1869,30 @@ private void internalCreateTimeseries(PartialPath path, TSDataType dataType) Collections.emptyMap()); } + /** create timeseries ignoring PathAlreadyExistException */ + private void internalCreateTimeseries( + PartialPath path, TSDataType dataType, TSEncoding encoding, CompressionType compressor) + throws MetadataException { + if (encoding == null) { + encoding = getDefaultEncoding(dataType); + } + if (compressor == null) { + compressor = TSFileDescriptor.getInstance().getConfig().getCompressor(); + } + createTimeseries(path, dataType, encoding, compressor, Collections.emptyMap()); + } + + /** create aligned timeseries ignoring PathAlreadyExistException */ + private void internalAlignedCreateTimeseries( + PartialPath prefixPath, + List measurements, + List dataTypes, + List encodings, + List compressors) + throws MetadataException { + createAlignedTimeSeries(prefixPath, measurements, dataTypes, encodings, compressors); + } + /** create aligned timeseries ignoring PathAlreadyExistException */ private void internalAlignedCreateTimeseries( PartialPath prefixPath, List measurements, List dataTypes) @@ -1887,6 +1911,8 @@ public DeviceSchemaInfo getDeviceSchemaInfoWithAutoCreate( PartialPath devicePath, String[] measurements, Function getDataType, + TSEncoding[] encodings, + CompressionType[] compressionTypes, boolean aligned) throws MetadataException { try { @@ -1898,14 +1924,23 @@ public DeviceSchemaInfo getDeviceSchemaInfoWithAutoCreate( if (measurementMNode == null) { if (config.isAutoCreateSchemaEnabled()) { if (aligned) { + TSDataType dataType = getDataType.apply(i); internalAlignedCreateTimeseries( devicePath, Collections.singletonList(measurements[i]), - Collections.singletonList(getDataType.apply(i))); - + Collections.singletonList(dataType), + Collections.singletonList( + encodings[i] == null ? getDefaultEncoding(dataType) : encodings[i]), + Collections.singletonList( + compressionTypes[i] == null + ? TSFileDescriptor.getInstance().getConfig().getCompressor() + : compressionTypes[i])); } else { internalCreateTimeseries( - devicePath.concatNode(measurements[i]), getDataType.apply(i)); + devicePath.concatNode(measurements[i]), + getDataType.apply(i), + encodings[i], + compressionTypes[i]); } // after creating timeseries, the deviceMNode has been replaced by a new entityMNode deviceMNode = mtree.getNodeByPath(devicePath); diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java index 657fd9b50685e..bf2ada9695a8a 100644 --- a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java @@ -1663,6 +1663,8 @@ public DeviceSchemaInfo getDeviceSchemaInfoWithAutoCreate( PartialPath devicePath, String[] measurements, Function getDataType, + TSEncoding[] encodings, + CompressionType[] compressionTypes, boolean aligned) throws MetadataException { throw new UnsupportedOperationException(); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java index 980ccbeac76fe..c0df8bc3cbcb1 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java @@ -28,8 +28,12 @@ import org.apache.iotdb.commons.partition.SchemaPartition; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.path.PathPatternTree; +import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.engine.StorageEngineV2; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; +import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus; +import org.apache.iotdb.db.exception.LoadFileException; +import org.apache.iotdb.db.exception.VerifyMetadataException; import org.apache.iotdb.db.exception.metadata.template.TemplateImcompatibeException; import org.apache.iotdb.db.exception.sql.MeasurementNotExistException; import org.apache.iotdb.db.exception.sql.SemanticException; @@ -42,6 +46,8 @@ import org.apache.iotdb.db.mpp.common.header.DatasetHeaderFactory; import org.apache.iotdb.db.mpp.common.schematree.DeviceSchemaInfo; import org.apache.iotdb.db.mpp.common.schematree.ISchemaTree; +import org.apache.iotdb.db.mpp.plan.Coordinator; +import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult; import org.apache.iotdb.db.mpp.plan.expression.Expression; import org.apache.iotdb.db.mpp.plan.expression.ExpressionType; import org.apache.iotdb.db.mpp.plan.expression.leaf.TimeSeriesOperand; @@ -79,6 +85,7 @@ import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateAlignedTimeSeriesStatement; import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateMultiTimeSeriesStatement; import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateTimeSeriesStatement; +import org.apache.iotdb.db.mpp.plan.statement.metadata.SetStorageGroupStatement; import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowChildNodesStatement; import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowChildPathsStatement; import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowClusterStatement; @@ -96,13 +103,23 @@ import org.apache.iotdb.db.mpp.plan.statement.sys.ExplainStatement; import org.apache.iotdb.db.mpp.plan.statement.sys.ShowVersionStatement; import org.apache.iotdb.db.mpp.plan.statement.sys.sync.ShowPipeSinkTypeStatement; +import org.apache.iotdb.db.query.control.SessionManager; import org.apache.iotdb.db.utils.FileLoaderUtils; +import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.iotdb.tsfile.common.constant.TsFileConstant; +import org.apache.iotdb.tsfile.file.header.ChunkHeader; +import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata; +import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata; +import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; +import org.apache.iotdb.tsfile.read.TsFileSequenceReader; import org.apache.iotdb.tsfile.read.filter.GroupByFilter; import org.apache.iotdb.tsfile.read.filter.GroupByMonthFilter; import org.apache.iotdb.tsfile.read.filter.basic.Filter; import org.apache.iotdb.tsfile.read.filter.factory.FilterFactory; import org.apache.iotdb.tsfile.utils.Pair; +import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -110,6 +127,7 @@ import java.io.File; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -1436,29 +1454,49 @@ public Analysis visitLoadFile(LoadTsFileStatement loadTsFileStatement, MPPQueryC Map device2MinTime = new HashMap<>(); Map device2MaxTime = new HashMap<>(); + Map> device2Schemas = new HashMap<>(); + Map> device2IsAligned = new HashMap<>(); + + // analyze tsfile metadata for (File tsFile : loadTsFileStatement.getTsFiles()) { try { - TsFileResource resource = new TsFileResource(tsFile); - FileLoaderUtils.loadOrGenerateResource(resource); - for (String device : resource.getDevices()) { - device2MinTime.put( - device, - Math.min( - device2MinTime.getOrDefault(device, Long.MAX_VALUE), - resource.getStartTime(device))); - device2MaxTime.put( - device, - Math.max( - device2MaxTime.getOrDefault(device, Long.MIN_VALUE), - resource.getEndTime(device))); - } - } catch (IOException e) { + TsFileResource resource = + analyzeTsFile( + loadTsFileStatement, + tsFile, + device2MinTime, + device2MaxTime, + device2Schemas, + device2IsAligned); + loadTsFileStatement.addTsFileResource(resource); + } catch (Exception e) { logger.error(String.format("Parse file %s to resource error.", tsFile.getPath()), e); throw new SemanticException( String.format("Parse file %s to resource error", tsFile.getPath())); } } + // auto create and verify schema + if (loadTsFileStatement.isVerifySchema() || loadTsFileStatement.isAutoCreateSchema()) { + try { + if (loadTsFileStatement.isVerifySchema()) { + verifyLoadingMeasurements(device2Schemas); + } + autoCreateSg(loadTsFileStatement.getSgLevel(), device2Schemas); + ISchemaTree schemaTree = autoCreateSchema(device2Schemas, device2IsAligned); + if (loadTsFileStatement.isVerifySchema()) { + verifySchema(schemaTree, device2Schemas, device2IsAligned); + } + } catch (Exception e) { + logger.error("Auto create or verify schema error.", e); + throw new SemanticException( + String.format( + "Auto create or verify schema error when executing statement %s.", + loadTsFileStatement)); + } + } + + // construct partition info List params = new ArrayList<>(); for (Map.Entry entry : device2MinTime.entrySet()) { List timePartitionSlots = new ArrayList<>(); @@ -1485,6 +1523,260 @@ public Analysis visitLoadFile(LoadTsFileStatement loadTsFileStatement, MPPQueryC return analysis; } + private TsFileResource analyzeTsFile( + LoadTsFileStatement statement, + File tsFile, + Map device2MinTime, + Map device2MaxTime, + Map> device2Schemas, + Map> device2IsAligned) + throws IOException, VerifyMetadataException { + try (TsFileSequenceReader reader = new TsFileSequenceReader(tsFile.getAbsolutePath())) { + Map> device2Metadata = reader.getAllTimeseriesMetadata(true); + + if (statement.isAutoCreateSchema() || statement.isVerifySchema()) { + // construct schema + for (Map.Entry> entry : device2Metadata.entrySet()) { + String device = entry.getKey(); + List timeseriesMetadataList = entry.getValue(); + boolean isAligned = false; + for (TimeseriesMetadata timeseriesMetadata : timeseriesMetadataList) { + TSDataType dataType = timeseriesMetadata.getTSDataType(); + if (!dataType.equals(TSDataType.VECTOR)) { + ChunkHeader chunkHeader = + getChunkHeaderByTimeseriesMetadata(reader, timeseriesMetadata); + MeasurementSchema measurementSchema = + new MeasurementSchema( + timeseriesMetadata.getMeasurementId(), + dataType, + chunkHeader.getEncodingType(), + chunkHeader.getCompressionType()); + device2Schemas + .computeIfAbsent(device, o -> new HashMap<>()) + .put(measurementSchema, tsFile); + } else { + isAligned = true; + } + } + boolean finalIsAligned = isAligned; + if (!device2IsAligned + .computeIfAbsent(device, o -> new Pair<>(finalIsAligned, tsFile)) + .left + .equals(isAligned)) { + throw new VerifyMetadataException( + String.format( + "Device %s has different aligned definition in tsFile %s and other TsFile.", + device, tsFile.getParentFile())); + } + } + } + + // construct TsFileResource + TsFileResource resource = new TsFileResource(tsFile); + FileLoaderUtils.updateTsFileResource(device2Metadata, resource); + resource.updatePlanIndexes(reader.getMinPlanIndex()); + resource.updatePlanIndexes(reader.getMaxPlanIndex()); + + // construct device time range + for (String device : resource.getDevices()) { + device2MinTime.put( + device, + Math.min( + device2MinTime.getOrDefault(device, Long.MAX_VALUE), + resource.getStartTime(device))); + device2MaxTime.put( + device, + Math.max( + device2MaxTime.getOrDefault(device, Long.MIN_VALUE), resource.getEndTime(device))); + } + + resource.setStatus(TsFileResourceStatus.CLOSED); + return resource; + } + } + + private ChunkHeader getChunkHeaderByTimeseriesMetadata( + TsFileSequenceReader reader, TimeseriesMetadata timeseriesMetadata) throws IOException { + IChunkMetadata chunkMetadata = timeseriesMetadata.getChunkMetadataList().get(0); + reader.position(chunkMetadata.getOffsetOfChunkHeader()); + return reader.readChunkHeader(reader.readMarker()); + } + + private void autoCreateSg(int sgLevel, Map> device2Schemas) + throws VerifyMetadataException, LoadFileException, IllegalPathException { + sgLevel += 1; // e.g. "root.sg" means sgLevel = 1, "root.sg.test" means sgLevel=2 + Set sgSet = new HashSet<>(); + for (String device : device2Schemas.keySet()) { + PartialPath devicePath = new PartialPath(device); + + String[] nodes = devicePath.getNodes(); + String[] sgNodes = new String[sgLevel]; + if (nodes.length < sgLevel) { + throw new VerifyMetadataException( + String.format("Sg level %d is longer than device %s.", sgLevel, device)); + } + for (int i = 0; i < sgLevel; i++) { + sgNodes[i] = nodes[i]; + } + PartialPath sgPath = new PartialPath(sgNodes); + sgSet.add(sgPath); + } + + for (PartialPath sgPath : sgSet) { + SetStorageGroupStatement statement = new SetStorageGroupStatement(); + statement.setStorageGroupPath(sgPath); + executeSetStorageGroupStatement(statement); + } + } + + private void executeSetStorageGroupStatement(Statement statement) throws LoadFileException { + long queryId = SessionManager.getInstance().requestQueryId(false); + ExecutionResult result = + Coordinator.getInstance() + .execute( + statement, + queryId, + null, + "", + partitionFetcher, + schemaFetcher, + IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold()); + if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode() + && result.status.code != TSStatusCode.STORAGE_GROUP_ALREADY_EXISTS.getStatusCode()) { + logger.error(String.format("Set Storage group error, statement: %s.", statement)); + logger.error(String.format("Set storage group result status : %s.", result.status)); + throw new LoadFileException( + String.format("Can not execute set storage group statement: %s", statement)); + } + } + + private ISchemaTree autoCreateSchema( + Map> device2Schemas, + Map> device2IsAligned) + throws IllegalPathException { + List deviceList = new ArrayList<>(); + List measurementList = new ArrayList<>(); + List dataTypeList = new ArrayList<>(); + List encodingsList = new ArrayList<>(); + List compressionTypesList = new ArrayList<>(); + List isAlignedList = new ArrayList<>(); + + for (Map.Entry> entry : device2Schemas.entrySet()) { + int measurementSize = entry.getValue().size(); + String[] measurements = new String[measurementSize]; + TSDataType[] tsDataTypes = new TSDataType[measurementSize]; + TSEncoding[] encodings = new TSEncoding[measurementSize]; + CompressionType[] compressionTypes = new CompressionType[measurementSize]; + + int index = 0; + for (MeasurementSchema measurementSchema : entry.getValue().keySet()) { + measurements[index] = measurementSchema.getMeasurementId(); + tsDataTypes[index] = measurementSchema.getType(); + encodings[index] = measurementSchema.getEncodingType(); + compressionTypes[index++] = measurementSchema.getCompressor(); + } + + deviceList.add(new PartialPath(entry.getKey())); + measurementList.add(measurements); + dataTypeList.add(tsDataTypes); + encodingsList.add(encodings); + compressionTypesList.add(compressionTypes); + isAlignedList.add(device2IsAligned.get(entry.getKey()).left); + } + + return SchemaValidator.validate( + deviceList, + measurementList, + dataTypeList, + encodingsList, + compressionTypesList, + isAlignedList); + } + + private void verifyLoadingMeasurements(Map> device2Schemas) + throws VerifyMetadataException { + for (Map.Entry> deviceEntry : device2Schemas.entrySet()) { + Map id2Schema = new HashMap<>(); + Map schema2TsFile = deviceEntry.getValue(); + for (Map.Entry entry : schema2TsFile.entrySet()) { + String measurementId = entry.getKey().getMeasurementId(); + if (!id2Schema.containsKey(measurementId)) { + id2Schema.put(measurementId, entry.getKey()); + } else { + MeasurementSchema conflictSchema = id2Schema.get(measurementId); + String msg = + String.format( + "Measurement %s Conflict, TsFile %s has measurement: %s, TsFile %s has measurement %s.", + deviceEntry.getKey() + measurementId, + entry.getValue().getPath(), + entry.getKey(), + schema2TsFile.get(conflictSchema).getPath(), + conflictSchema); + logger.error(msg); + throw new VerifyMetadataException(msg); + } + } + } + } + + private void verifySchema( + ISchemaTree schemaTree, + Map> device2Schemas, + Map> device2IsAligned) + throws VerifyMetadataException, IllegalPathException { + for (Map.Entry> entry : device2Schemas.entrySet()) { + String device = entry.getKey(); + MeasurementSchema[] tsFileSchemas = + entry.getValue().keySet().toArray(new MeasurementSchema[0]); + DeviceSchemaInfo schemaInfo = + schemaTree.searchDeviceSchemaInfo( + new PartialPath(device), + Arrays.stream(tsFileSchemas) + .map(MeasurementSchema::getMeasurementId) + .collect(Collectors.toList())); + if (schemaInfo.isAligned() != device2IsAligned.get(device).left) { + throw new VerifyMetadataException( + device, + "Is aligned", + device2IsAligned.get(device).left.toString(), + device2IsAligned.get(device).right.getPath(), + String.valueOf(schemaInfo.isAligned())); + } + List originSchemaList = schemaInfo.getMeasurementSchemaList(); + int measurementSize = originSchemaList.size(); + for (int j = 0; j < measurementSize; j++) { + MeasurementSchema originSchema = originSchemaList.get(j); + MeasurementSchema tsFileSchema = tsFileSchemas[j]; + String measurementPath = + device + TsFileConstant.PATH_SEPARATOR + originSchema.getMeasurementId(); + if (!tsFileSchema.getType().equals(originSchema.getType())) { + throw new VerifyMetadataException( + measurementPath, + "Datatype", + tsFileSchema.getType().name(), + entry.getValue().get(tsFileSchema).getPath(), + originSchema.getType().name()); + } + if (!tsFileSchema.getEncodingType().equals(originSchema.getEncodingType())) { + throw new VerifyMetadataException( + measurementPath, + "Encoding", + tsFileSchema.getEncodingType().name(), + entry.getValue().get(tsFileSchema).getPath(), + originSchema.getEncodingType().name()); + } + if (!tsFileSchema.getCompressor().equals(originSchema.getCompressor())) { + throw new VerifyMetadataException( + measurementPath, + "Compress type", + tsFileSchema.getCompressor().name(), + entry.getValue().get(tsFileSchema).getPath(), + originSchema.getCompressor().name()); + } + } + } + } + @Override public Analysis visitShowTimeSeries( ShowTimeSeriesStatement showTimeSeriesStatement, MPPQueryContext context) { diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java index 643e8a374bc16..d80ef21db791f 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java @@ -208,6 +208,8 @@ public ISchemaTree fetchSchemaWithAutoCreate( indexOfMissingMeasurements, measurements, getDataType, + null, + null, isAligned); schemaTree.mergeSchemaTree(missingSchemaTree); @@ -225,7 +227,20 @@ public ISchemaTree fetchSchemaListWithAutoCreate( List measurementsList, List tsDataTypesList, List isAlignedList) { + return fetchSchemaListWithAutoCreate( + devicePathList, measurementsList, tsDataTypesList, null, null, isAlignedList); + } + + @Override + public ISchemaTree fetchSchemaListWithAutoCreate( + List devicePathList, + List measurementsList, + List tsDataTypesList, + List encodingsList, + List compressionTypesList, + List isAlignedList) { schemaCache.takeReadLock(); + try { ClusterSchemaTree schemaTree = new ClusterSchemaTree(); PathPatternTree patternTree = new PathPatternTree(); @@ -264,6 +279,8 @@ public ISchemaTree fetchSchemaListWithAutoCreate( indexOfMissingMeasurementsList.get(i), measurementsList.get(i), index -> tsDataTypesList.get(finalI)[index], + encodingsList == null ? null : encodingsList.get(i), + compressionTypesList == null ? null : compressionTypesList.get(i), isAlignedList.get(i)); schemaTree.mergeSchemaTree(missingSchemaTree); schemaCache.put(missingSchemaTree); @@ -295,6 +312,8 @@ private ClusterSchemaTree checkAndAutoCreateMissingMeasurements( List indexOfMissingMeasurements, String[] measurements, Function getDataType, + TSEncoding[] encodings, + CompressionType[] compressionTypes, boolean isAligned) { DeviceSchemaInfo deviceSchemaInfo = schemaTree.searchDeviceSchemaInfo( @@ -356,15 +375,31 @@ private ClusterSchemaTree checkAndAutoCreateMissingMeasurements( List missingMeasurements = new ArrayList<>(indexOfMissingMeasurements.size()); List dataTypesOfMissingMeasurement = new ArrayList<>(indexOfMissingMeasurements.size()); + List encodingsOfMissingMeasurement = + new ArrayList<>(indexOfMissingMeasurements.size()); + List compressionTypesOfMissingMeasurement = + new ArrayList<>(indexOfMissingMeasurements.size()); indexOfMissingMeasurements.forEach( index -> { + TSDataType tsDataType = getDataType.apply(index); missingMeasurements.add(measurements[index]); - dataTypesOfMissingMeasurement.add(getDataType.apply(index)); + dataTypesOfMissingMeasurement.add(tsDataType); + encodingsOfMissingMeasurement.add( + encodings == null ? getDefaultEncoding(tsDataType) : encodings[index]); + compressionTypesOfMissingMeasurement.add( + compressionTypes == null + ? TSFileDescriptor.getInstance().getConfig().getCompressor() + : compressionTypes[index]); }); schemaTree.mergeSchemaTree( internalCreateTimeseries( - devicePath, missingMeasurements, dataTypesOfMissingMeasurement, isAligned)); + devicePath, + missingMeasurements, + dataTypesOfMissingMeasurement, + encodingsOfMissingMeasurement, + compressionTypesOfMissingMeasurement, + isAligned)); return schemaTree; } @@ -392,15 +427,9 @@ private ClusterSchemaTree internalCreateTimeseries( PartialPath devicePath, List measurements, List tsDataTypes, + List encodings, + List compressors, boolean isAligned) { - - List encodings = new ArrayList<>(); - List compressors = new ArrayList<>(); - for (TSDataType dataType : tsDataTypes) { - encodings.add(getDefaultEncoding(dataType)); - compressors.add(TSFileDescriptor.getInstance().getConfig().getCompressor()); - } - List measurementPathList = executeInternalCreateTimeseriesStatement( new InternalCreateTimeSeriesStatement( diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java index 4e478b18fdf00..79208ae4f6d06 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java @@ -28,7 +28,9 @@ import org.apache.iotdb.db.mpp.common.schematree.node.SchemaInternalNode; import org.apache.iotdb.db.mpp.common.schematree.node.SchemaMeasurementNode; import org.apache.iotdb.db.mpp.common.schematree.node.SchemaNode; +import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; import org.apache.iotdb.tsfile.utils.Pair; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; @@ -111,6 +113,17 @@ public ISchemaTree fetchSchemaListWithAutoCreate( return null; } + @Override + public ISchemaTree fetchSchemaListWithAutoCreate( + List devicePath, + List measurements, + List tsDataTypes, + List encodings, + List compressionTypes, + List aligned) { + return null; + } + @Override public Pair checkTemplateSetInfo(PartialPath path) { return null; diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ISchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ISchemaFetcher.java index 1e319efaec26f..a688949cbb2ba 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ISchemaFetcher.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ISchemaFetcher.java @@ -23,7 +23,9 @@ import org.apache.iotdb.commons.path.PathPatternTree; import org.apache.iotdb.db.metadata.template.Template; import org.apache.iotdb.db.mpp.common.schematree.ISchemaTree; +import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; import org.apache.iotdb.tsfile.utils.Pair; import java.util.List; @@ -49,6 +51,14 @@ ISchemaTree fetchSchemaListWithAutoCreate( List tsDataTypes, List aligned); + ISchemaTree fetchSchemaListWithAutoCreate( + List devicePath, + List measurements, + List tsDataTypes, + List encodings, + List compressionTypes, + List aligned); + Pair checkTemplateSetInfo(PartialPath path); Map checkAllRelatedTemplate(PartialPath pathPattern); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/SchemaValidator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/SchemaValidator.java index bb34ce306f38c..f40e82abb4f03 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/SchemaValidator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/SchemaValidator.java @@ -25,7 +25,9 @@ import org.apache.iotdb.db.mpp.common.schematree.ISchemaTree; import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.BatchInsertNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode; +import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; import java.util.List; @@ -67,8 +69,10 @@ public static ISchemaTree validate( List devicePaths, List measurements, List dataTypes, + List encodings, + List compressionTypes, List isAlignedList) { return SCHEMA_FETCHER.fetchSchemaListWithAutoCreate( - devicePaths, measurements, dataTypes, isAlignedList); + devicePaths, measurements, dataTypes, encodings, compressionTypes, isAlignedList); } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/StandaloneSchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/StandaloneSchemaFetcher.java index e056f4240bc74..f4060e7c4ed05 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/StandaloneSchemaFetcher.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/StandaloneSchemaFetcher.java @@ -33,7 +33,9 @@ import org.apache.iotdb.db.mpp.common.schematree.DeviceGroupSchemaTree; import org.apache.iotdb.db.mpp.common.schematree.DeviceSchemaInfo; import org.apache.iotdb.db.mpp.common.schematree.ISchemaTree; +import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; import org.apache.iotdb.tsfile.utils.Pair; import java.util.ArrayList; @@ -87,7 +89,13 @@ public ISchemaTree fetchSchemaWithAutoCreate( Function getDataType, boolean aligned) { DeviceSchemaInfo deviceSchemaInfo = - getDeviceSchemaInfoWithAutoCreate(devicePath, measurements, getDataType, aligned); + getDeviceSchemaInfoWithAutoCreate( + devicePath, + measurements, + getDataType, + new TSEncoding[measurements.length], + new CompressionType[measurements.length], + aligned); DeviceGroupSchemaTree schemaTree = new DeviceGroupSchemaTree(); schemaTree.addDeviceInfo(deviceSchemaInfo); return schemaTree; @@ -97,12 +105,14 @@ private DeviceSchemaInfo getDeviceSchemaInfoWithAutoCreate( PartialPath devicePath, String[] measurements, Function getDataType, + TSEncoding[] encodings, + CompressionType[] compressionTypes, boolean aligned) { try { SchemaRegionId schemaRegionId = localConfigNode.getBelongedSchemaRegionId(devicePath); ISchemaRegion schemaRegion = schemaEngine.getSchemaRegion(schemaRegionId); return schemaRegion.getDeviceSchemaInfoWithAutoCreate( - devicePath, measurements, getDataType, aligned); + devicePath, measurements, getDataType, encodings, compressionTypes, aligned); } catch (MetadataException e) { throw new RuntimeException(e); } @@ -114,6 +124,18 @@ public ISchemaTree fetchSchemaListWithAutoCreate( List measurementsList, List tsDataTypesList, List isAlignedList) { + return fetchSchemaListWithAutoCreate( + devicePathList, measurementsList, tsDataTypesList, null, null, isAlignedList); + } + + @Override + public ISchemaTree fetchSchemaListWithAutoCreate( + List devicePathList, + List measurementsList, + List tsDataTypesList, + List encodingsList, + List compressionTypesList, + List isAlignedList) { Map> deviceMap = new HashMap<>(); for (int i = 0, size = devicePathList.size(); i < size; i++) { deviceMap.computeIfAbsent(devicePathList.get(i), k -> new ArrayList<>()).add(i); @@ -134,6 +156,8 @@ public ISchemaTree fetchSchemaListWithAutoCreate( String[] measurements = new String[totalSize]; TSDataType[] tsDataTypes = new TSDataType[totalSize]; + TSEncoding[] encodings = new TSEncoding[totalSize]; + CompressionType[] compressionTypes = new CompressionType[totalSize]; int curPos = 0; for (int index : entry.getValue()) { @@ -145,12 +169,29 @@ public ISchemaTree fetchSchemaListWithAutoCreate( measurementsList.get(index).length); System.arraycopy( tsDataTypesList.get(index), 0, tsDataTypes, curPos, tsDataTypesList.get(index).length); + if (encodingsList != null) { + System.arraycopy( + encodingsList.get(index), 0, encodings, curPos, encodingsList.get(index).length); + } + if (compressionTypesList != null) { + System.arraycopy( + compressionTypesList.get(index), + 0, + compressionTypes, + curPos, + compressionTypesList.get(index).length); + } curPos += measurementsList.get(index).length; } schemaTree.addDeviceInfo( getDeviceSchemaInfoWithAutoCreate( - entry.getKey(), measurements, index -> tsDataTypes[index], isAligned)); + entry.getKey(), + measurements, + index -> tsDataTypes[index], + encodings, + compressionTypes, + isAligned)); } return schemaTree; diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java index 8561ad6842802..7ceee7a510fbc 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java @@ -479,7 +479,7 @@ public PlanNode visitInsertRow(InsertRowStatement insertRowStatement, MPPQueryCo @Override public PlanNode visitLoadFile(LoadTsFileStatement loadTsFileStatement, MPPQueryContext context) { return new LoadTsFileNode( - context.getQueryId().genPlanNodeId(), loadTsFileStatement.getTsFiles()); + context.getQueryId().genPlanNodeId(), loadTsFileStatement.getResources()); } @Override diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/load/LoadSingleTsFileNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/load/LoadSingleTsFileNode.java index 91519fe649769..21edd2ed0b46e 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/load/LoadSingleTsFileNode.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/load/LoadSingleTsFileNode.java @@ -23,20 +23,16 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; -import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.partition.DataPartition; -import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.engine.StorageEngineV2; import org.apache.iotdb.db.engine.load.AlignedChunkData; import org.apache.iotdb.db.engine.load.ChunkData; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.db.mpp.plan.analyze.Analysis; -import org.apache.iotdb.db.mpp.plan.analyze.SchemaValidator; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.mpp.plan.planner.plan.node.WritePlanNode; -import org.apache.iotdb.db.utils.FileLoaderUtils; import org.apache.iotdb.tsfile.common.conf.TSFileConfig; import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; import org.apache.iotdb.tsfile.common.constant.TsFileConstant; @@ -85,15 +81,13 @@ public LoadSingleTsFileNode(PlanNodeId id) { super(id); } - public LoadSingleTsFileNode(PlanNodeId id, File tsFile) throws IOException { + public LoadSingleTsFileNode(PlanNodeId id, TsFileResource resource) { super(id); - this.tsFile = tsFile; - this.resource = new TsFileResource(tsFile); - - FileLoaderUtils.loadOrGenerateResource(resource); + this.tsFile = resource.getTsFile(); + this.resource = resource; } - public void checkIfNeedDecodeTsFile(DataPartition dataPartition) { + public void checkIfNeedDecodeTsFile(DataPartition dataPartition) throws IOException { Set allRegionReplicaSet = new HashSet<>(); needDecodeTsFile = false; for (String device : resource.getDevices()) { @@ -105,6 +99,9 @@ public void checkIfNeedDecodeTsFile(DataPartition dataPartition) { allRegionReplicaSet.addAll(dataPartition.getAllDataRegionReplicaSetForOneDevice(device)); } needDecodeTsFile = !isDispatchedToLocal(allRegionReplicaSet); + if (!needDecodeTsFile) { + resource.serialize(); + } } private boolean isDispatchedToLocal(Set replicaSets) { @@ -127,45 +124,6 @@ private boolean isDispatchedToLocal(TEndPoint endPoint) { && IoTDBDescriptor.getInstance().getConfig().getInternalPort() == endPoint.port; } - public void autoRegisterSchema() - throws IOException, IllegalPathException { // TODO: only support sg level=1 - try (TsFileSequenceReader reader = new TsFileSequenceReader(tsFile.getAbsolutePath())) { - List deviceList = new ArrayList<>(); - List measurementList = new ArrayList<>(); - List dataTypeList = new ArrayList<>(); - List isAlignedList = new ArrayList<>(); - - Map> device2Metadata = reader.getAllTimeseriesMetadata(true); - for (Map.Entry> entry : device2Metadata.entrySet()) { - deviceList.add(new PartialPath(entry.getKey())); - - List timeseriesMetadataList = entry.getValue(); - boolean isAligned = - timeseriesMetadataList.stream() - .mapToInt(o -> o.getTSDataType().equals(TSDataType.VECTOR) ? 1 : 0) - .sum() - != 0; - int measurementSize = timeseriesMetadataList.size() - (isAligned ? 1 : 0); - String[] measurements = new String[measurementSize]; - TSDataType[] tsDataTypes = new TSDataType[measurementSize]; - - int index = 0; - for (TimeseriesMetadata timeseriesMetadata : timeseriesMetadataList) { - TSDataType dataType = timeseriesMetadata.getTSDataType(); - if (!dataType.equals(TSDataType.VECTOR)) { - measurements[index] = timeseriesMetadata.getMeasurementId(); - tsDataTypes[index++] = dataType; - } - } - measurementList.add(measurements); - dataTypeList.add(tsDataTypes); - isAlignedList.add(isAligned); - } - - SchemaValidator.validate(deviceList, measurementList, dataTypeList, isAlignedList); - } - } - public boolean needDecodeTsFile() { return needDecodeTsFile; } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/load/LoadTsFileNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/load/LoadTsFileNode.java index c991a9c4768b8..c0f6f8b8b540f 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/load/LoadTsFileNode.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/load/LoadTsFileNode.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.mpp.plan.planner.plan.node.load; import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; +import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.db.mpp.plan.analyze.Analysis; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId; @@ -30,7 +31,6 @@ import org.slf4j.LoggerFactory; import java.io.DataOutputStream; -import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -39,15 +39,15 @@ public class LoadTsFileNode extends WritePlanNode { private static final Logger logger = LoggerFactory.getLogger(LoadTsFileNode.class); - private final List tsFiles; + private final List resources; public LoadTsFileNode(PlanNodeId id) { this(id, new ArrayList<>()); } - public LoadTsFileNode(PlanNodeId id, List tsFiles) { + public LoadTsFileNode(PlanNodeId id, List resources) { super(id); - this.tsFiles = tsFiles; + this.resources = resources; } @Override @@ -87,18 +87,16 @@ protected void serializeAttributes(DataOutputStream stream) throws IOException { @Override public List splitByPartition(Analysis analysis) { List res = new ArrayList<>(); - for (File file : tsFiles) { + for (TsFileResource resource : resources) { try { - LoadSingleTsFileNode singleTsFileNode = new LoadSingleTsFileNode(getPlanNodeId(), file); + LoadSingleTsFileNode singleTsFileNode = new LoadSingleTsFileNode(getPlanNodeId(), resource); singleTsFileNode.checkIfNeedDecodeTsFile(analysis.getDataPartitionInfo()); - singleTsFileNode.autoRegisterSchema(); - if (singleTsFileNode.needDecodeTsFile()) { singleTsFileNode.splitTsFileByDataPartition(analysis.getDataPartitionInfo()); } res.add(singleTsFileNode); } catch (Exception e) { - logger.error(String.format("Parse TsFile %s error", file.getPath()), e); + logger.error(String.format("Parse TsFile %s error", resource.getTsFile().getPath()), e); } } return res; diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/LoadTsFileStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/LoadTsFileStatement.java index 4fb39cad1a71b..f5cd7e90cf8e4 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/LoadTsFileStatement.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/LoadTsFileStatement.java @@ -21,6 +21,7 @@ import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.db.mpp.plan.statement.Statement; import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor; import org.apache.iotdb.tsfile.common.constant.TsFileConstant; @@ -40,14 +41,16 @@ public class LoadTsFileStatement extends Statement { private boolean verifySchema; private List tsFiles; + private List resources; public LoadTsFileStatement(String filePath) { this.file = new File(filePath); this.autoCreateSchema = true; this.sgLevel = IoTDBDescriptor.getInstance().getConfig().getDefaultStorageGroupLevel(); this.verifySchema = true; + this.tsFiles = new ArrayList<>(); + this.resources = new ArrayList<>(); - tsFiles = new ArrayList<>(); if (file.isFile()) { tsFiles.add(file); } else { @@ -98,10 +101,30 @@ public void setVerifySchema(boolean verifySchema) { this.verifySchema = verifySchema; } + public boolean isVerifySchema() { + return verifySchema; + } + + public boolean isAutoCreateSchema() { + return autoCreateSchema; + } + + public int getSgLevel() { + return sgLevel; + } + public List getTsFiles() { return tsFiles; } + public void addTsFileResource(TsFileResource resource) { + resources.add(resource); + } + + public List getResources() { + return resources; + } + @Override public List getPaths() { return Collections.emptyList(); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/SetStorageGroupStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/SetStorageGroupStatement.java index 28e95a0c6b31a..799dd03c3a7b2 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/SetStorageGroupStatement.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/SetStorageGroupStatement.java @@ -97,4 +97,20 @@ public Integer getDataReplicationFactor() { public Long getTimePartitionInterval() { return timePartitionInterval; } + + @Override + public String toString() { + return "SetStorageGroupStatement{" + + "storageGroupPath=" + + storageGroupPath + + ", ttl=" + + ttl + + ", schemaReplicationFactor=" + + schemaReplicationFactor + + ", dataReplicationFactor=" + + dataReplicationFactor + + ", timePartitionInterval=" + + timePartitionInterval + + '}'; + } } diff --git a/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java index 35d8500107b6b..fd3e136486686 100644 --- a/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java +++ b/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java @@ -44,6 +44,7 @@ import java.util.ArrayList; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Map.Entry; import java.util.Set; @@ -68,8 +69,14 @@ public static void loadOrGenerateResource(TsFileResource tsFileResource) throws public static void updateTsFileResource( TsFileSequenceReader reader, TsFileResource tsFileResource) throws IOException { - for (Entry> entry : - reader.getAllTimeseriesMetadata(false).entrySet()) { + updateTsFileResource(reader.getAllTimeseriesMetadata(false), tsFileResource); + tsFileResource.updatePlanIndexes(reader.getMinPlanIndex()); + tsFileResource.updatePlanIndexes(reader.getMaxPlanIndex()); + } + + public static void updateTsFileResource( + Map> device2Metadata, TsFileResource tsFileResource) { + for (Entry> entry : device2Metadata.entrySet()) { for (TimeseriesMetadata timeseriesMetaData : entry.getValue()) { tsFileResource.updateStartTime( entry.getKey(), timeseriesMetaData.getStatistics().getStartTime()); @@ -77,8 +84,6 @@ public static void updateTsFileResource( entry.getKey(), timeseriesMetaData.getStatistics().getEndTime()); } } - tsFileResource.updatePlanIndexes(reader.getMinPlanIndex()); - tsFileResource.updatePlanIndexes(reader.getMaxPlanIndex()); } /** diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java index 8e714f4d1acd6..1004d6e666757 100644 --- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java +++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java @@ -72,7 +72,8 @@ public enum TSStatusCode { CREATE_TEMPLATE_ERROR(340), SYNC_FILE_REBASE(341), SYNC_FILE_ERROR(342), - MEASUREMENT_IN_BLACK_LIST(343), + VERIFY_METADATA_ERROR(343), + MEASUREMENT_IN_BLACK_LIST(344), EXECUTE_STATEMENT_ERROR(400), SQL_PARSE_ERROR(401),