Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
bcaaa91
create statement and analysis
yschengzi Jul 25, 2022
8adbf0e
Merge remote-tracking branch 'apache/master' into IOTDB-3656#mppload
yschengzi Aug 8, 2022
98a2286
finish coordinator and scheduler
yschengzi Aug 14, 2022
e07f821
working on dispatcher
yschengzi Aug 15, 2022
be7bd56
working on dispatcher
yschengzi Aug 15, 2022
1e35b5c
finish two phase
yschengzi Aug 15, 2022
4a062b0
finish framework
yschengzi Aug 16, 2022
47a0de0
working on serialize
yschengzi Aug 17, 2022
c3b4b3e
finish nonaligned
yschengzi Aug 17, 2022
4380fa8
finish serialize
yschengzi Aug 22, 2022
50b63be
spotless
yschengzi Aug 22, 2022
5e158ad
Merge remote-tracking branch 'apache/master' into IOTDB-3656#mppload
yschengzi Aug 22, 2022
2fa85e1
finish first phase
yschengzi Aug 22, 2022
519b5ec
finish local dispatcher
yschengzi Aug 22, 2022
7251f20
finish data region interface
yschengzi Aug 23, 2022
42dbf51
finish entire chunk
yschengzi Aug 25, 2022
eda0975
finish debug
yschengzi Aug 25, 2022
340fc27
Merge remote-tracking branch 'apache/master' into IOTDB-3656#mppload
yschengzi Aug 26, 2022
c5b69f3
merge master
yschengzi Aug 26, 2022
265f390
reset usage for debug
yschengzi Aug 29, 2022
bc6fbd4
Merge remote-tracking branch 'apache/master' into IOTDB-3656#mppload
yschengzi Aug 29, 2022
c9d2e95
finish IT
yschengzi Aug 29, 2022
ccc8bc9
reset TsFileSequenceRead
yschengzi Aug 30, 2022
b9ba16b
rename method and set log
yschengzi Aug 31, 2022
2771583
finsih auto create schema
yschengzi Sep 1, 2022
8ed9e70
finish debug
yschengzi Sep 4, 2022
010f0c6
finish test
yschengzi Sep 7, 2022
0c111b7
finish device and measurements
yschengzi Sep 9, 2022
517fbd2
finsih auto create
yschengzi Sep 9, 2022
99e1242
Merge remote-tracking branch 'apache/master' into IOTDB-3656#mppload
yschengzi Sep 13, 2022
67aeecf
merge to master
yschengzi Sep 13, 2022
558b69b
Merge branch 'IOTDB-3656#mppload' into IOTDB-3656#loadExt
yschengzi Sep 13, 2022
871e024
Merge branch 'IOTDB-3656#mppload' into IOTDB-3656#autoCreate
yschengzi Sep 13, 2022
51b4021
Merge branch 'IOTDB-3656#loadExt' into IOTDB-3656#autoCreate
yschengzi Sep 13, 2022
2edab4a
finsih auto register
yschengzi Sep 14, 2022
84d33d3
add resource serialize
yschengzi Sep 14, 2022
233b32d
add doc
yschengzi Sep 14, 2022
c813f24
Merge branch 'IOTDB-3656#mppload' into IOTDB-3656#autoCreate
yschengzi Sep 14, 2022
d47a0f9
finsih IT
yschengzi Sep 14, 2022
5e14e5c
Merge remote-tracking branch 'apache/master' into IOTDB-3656#mppload
yschengzi Sep 15, 2022
61cd7b9
Merge branch 'IOTDB-3656#mppload' into IOTDB-3656#autoCreate
yschengzi Sep 15, 2022
7d39ee9
fix SchemaRegionMemoryImpl
yschengzi Sep 15, 2022
ab5af3d
spotless
yschengzi Sep 16, 2022
502c59d
fix Collections.singletonlist
yschengzi Sep 16, 2022
ee41ff2
Merge remote-tracking branch 'apache/master' into IOTDB-3656#mppload
yschengzi Sep 16, 2022
5eb1e9b
spotless
yschengzi Sep 16, 2022
7e132a6
Merge branch 'IOTDB-3656#mppload' into IOTDB-3656#autoCreate
yschengzi Sep 16, 2022
a7c8bb0
spotless
yschengzi Sep 16, 2022
bf0e43d
Merge remote-tracking branch 'apache/master' into IOTDB-3656#autoCreate
yschengzi Sep 19, 2022
d69fc73
fix merge
yschengzi Sep 19, 2022
855366f
fix init class
yschengzi Sep 19, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.iotdb.itbase.category.LocalStandaloneIT;
import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.write.TsFileWriter;
Expand Down Expand Up @@ -72,8 +73,6 @@ public void setUp() throws Exception {
originPartitionInterval = ConfigFactory.getConfig().getPartitionInterval();
ConfigFactory.getConfig().setPartitionInterval(PARTITION_INTERVAL);
EnvFactory.getEnv().initBeforeTest();

registerSchema();
}

@After
Expand Down Expand Up @@ -159,6 +158,8 @@ private boolean deleteDir() {

@Test
public void testLoad() throws Exception {
registerSchema();

long writtenPoint1 = 0;
// device 0, device 1, sg 0
try (TsFileGenerator generator = new TsFileGenerator(new File(tmpDir, "1-0-0-0.tsfile"))) {
Expand Down Expand Up @@ -199,7 +200,66 @@ public void testLoad() throws Exception {
try (Connection connection = EnvFactory.getEnv().getConnection();
Statement statement = connection.createStatement()) {

statement.execute(String.format("load \"%s\"", tmpDir.getAbsolutePath()));
statement.execute(String.format("load \"%s\" sglevel=2", tmpDir.getAbsolutePath()));

try (ResultSet resultSet =
statement.executeQuery("select count(*) from root.** group by level=1,2")) {
if (resultSet.next()) {
long sg1Count = resultSet.getLong("count(root.sg.test_0.*.*)");
Assert.assertEquals(writtenPoint1, sg1Count);
long sg2Count = resultSet.getLong("count(root.sg.test_1.*.*)");
Assert.assertEquals(writtenPoint2, sg2Count);
} else {
Assert.fail("This ResultSet is empty.");
}
}
}
}

@Test
public void testLoadWithAutoRegister() throws Exception {
long writtenPoint1 = 0;
// device 0, device 1, sg 0
try (TsFileGenerator generator = new TsFileGenerator(new File(tmpDir, "1-0-0-0.tsfile"))) {
generator.registerTimeseries(
new Path(SchemaConfig.DEVICE_0),
Arrays.asList(
SchemaConfig.MEASUREMENT_00,
SchemaConfig.MEASUREMENT_01,
SchemaConfig.MEASUREMENT_02,
SchemaConfig.MEASUREMENT_03));
generator.registerAlignedTimeseries(
new Path(SchemaConfig.DEVICE_1),
Arrays.asList(
SchemaConfig.MEASUREMENT_10,
SchemaConfig.MEASUREMENT_11,
SchemaConfig.MEASUREMENT_12,
SchemaConfig.MEASUREMENT_13));
generator.generateData(new Path(SchemaConfig.DEVICE_0), 10000, false);
generator.generateData(new Path(SchemaConfig.DEVICE_1), 10000, true);
writtenPoint1 = generator.getTotalNumber();
}

long writtenPoint2 = 0;
// device 2, device 3, device4, sg 1
try (TsFileGenerator generator = new TsFileGenerator(new File(tmpDir, "2-0-0-0.tsfile"))) {
generator.registerTimeseries(
new Path(SchemaConfig.DEVICE_2), Arrays.asList(SchemaConfig.MEASUREMENT_20));
generator.registerTimeseries(
new Path(SchemaConfig.DEVICE_3), Arrays.asList(SchemaConfig.MEASUREMENT_30));
generator.registerAlignedTimeseries(
new Path(SchemaConfig.DEVICE_4), Arrays.asList(SchemaConfig.MEASUREMENT_40));
generator.generateData(new Path(SchemaConfig.DEVICE_2), 10000, false);
generator.generateData(new Path(SchemaConfig.DEVICE_3), 10000, false);
generator.generateData(new Path(SchemaConfig.DEVICE_4), 10000, true);
writtenPoint2 = generator.getTotalNumber();
}

try (Connection connection = EnvFactory.getEnv().getConnection();
Statement statement = connection.createStatement()) {

statement.execute(
String.format("load \"%s\" sglevel=2,autoregister=true", tmpDir.getAbsolutePath()));

try (ResultSet resultSet =
statement.executeQuery("select count(*) from root.** group by level=1,2")) {
Expand All @@ -212,6 +272,25 @@ public void testLoad() throws Exception {
Assert.fail("This ResultSet is empty.");
}
}

Map<String, String> isAligned = new HashMap<>();
isAligned.put(SchemaConfig.DEVICE_0, "false");
isAligned.put(SchemaConfig.DEVICE_1, "true");
isAligned.put(SchemaConfig.DEVICE_2, "false");
isAligned.put(SchemaConfig.DEVICE_3, "false");
isAligned.put(SchemaConfig.DEVICE_4, "true");
try (ResultSet resultSet = statement.executeQuery("show devices")) {
int size = 0;
while (resultSet.next()) {
size += 1;
String device = resultSet.getString("devices");
Assert.assertEquals(isAligned.get(device), resultSet.getString("isAligned"));
}
Assert.assertEquals(isAligned.size(), size);
} catch (Exception e) {
e.printStackTrace();
Assert.fail("Parse result set error.");
}
}
}

Expand All @@ -222,39 +301,39 @@ private static class SchemaConfig {
// device 0, nonaligned, sg 0
private static final String DEVICE_0 = "root.sg.test_0.d_0";
private static final MeasurementSchema MEASUREMENT_00 =
new MeasurementSchema("sensor_00", TSDataType.INT32);
new MeasurementSchema("sensor_00", TSDataType.INT32, TSEncoding.RLE);
private static final MeasurementSchema MEASUREMENT_01 =
new MeasurementSchema("sensor_01", TSDataType.INT64);
new MeasurementSchema("sensor_01", TSDataType.INT64, TSEncoding.RLE);
private static final MeasurementSchema MEASUREMENT_02 =
new MeasurementSchema("sensor_02", TSDataType.DOUBLE);
new MeasurementSchema("sensor_02", TSDataType.DOUBLE, TSEncoding.GORILLA);
private static final MeasurementSchema MEASUREMENT_03 =
new MeasurementSchema("sensor_03", TSDataType.TEXT);
new MeasurementSchema("sensor_03", TSDataType.TEXT, TSEncoding.PLAIN);

// device 1, aligned, sg 0
private static final String DEVICE_1 = "root.sg.test_0.a_1";
private static final MeasurementSchema MEASUREMENT_10 =
new MeasurementSchema("sensor_10", TSDataType.INT32);
new MeasurementSchema("sensor_10", TSDataType.INT32, TSEncoding.RLE);
private static final MeasurementSchema MEASUREMENT_11 =
new MeasurementSchema("sensor_11", TSDataType.INT64);
new MeasurementSchema("sensor_11", TSDataType.INT64, TSEncoding.RLE);
private static final MeasurementSchema MEASUREMENT_12 =
new MeasurementSchema("sensor_12", TSDataType.DOUBLE);
new MeasurementSchema("sensor_12", TSDataType.DOUBLE, TSEncoding.GORILLA);
private static final MeasurementSchema MEASUREMENT_13 =
new MeasurementSchema("sensor_13", TSDataType.TEXT);
new MeasurementSchema("sensor_13", TSDataType.TEXT, TSEncoding.PLAIN);

// device 2, non aligned, sg 1
private static final String DEVICE_2 = "root.sg.test_1.d_2";
private static final MeasurementSchema MEASUREMENT_20 =
new MeasurementSchema("sensor_20", TSDataType.INT32);
new MeasurementSchema("sensor_20", TSDataType.INT32, TSEncoding.RLE);

// device 3, non aligned, sg 1
private static final String DEVICE_3 = "root.sg.test_1.d_3";
private static final MeasurementSchema MEASUREMENT_30 =
new MeasurementSchema("sensor_30", TSDataType.INT32);
new MeasurementSchema("sensor_30", TSDataType.INT32, TSEncoding.RLE);

// device 4, aligned, sg 1
private static final String DEVICE_4 = "root.sg.test_1.a_4";
private static final MeasurementSchema MEASUREMENT_40 =
new MeasurementSchema("sensor_40", TSDataType.INT32);
new MeasurementSchema("sensor_40", TSDataType.INT32, TSEncoding.RLE);
}

public class TsFileGenerator implements AutoCloseable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1955,6 +1955,8 @@ public DeviceSchemaInfo getDeviceSchemaInfoWithAutoCreate(
PartialPath devicePath,
String[] measurements,
Function<Integer, TSDataType> getDataType,
TSEncoding[] encodings,
CompressionType[] compressionTypes,
boolean aligned)
throws MetadataException {
throw new UnsupportedOperationException();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.db.exception;

import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.rpc.TSStatusCode;

public class VerifyMetadataException extends IoTDBException {
public VerifyMetadataException(
String path, String compareInfo, String tsFileInfo, String tsFilePath, String IoTDBInfo) {
super(
String.format(
"%s %s mismatch, %s in tsfile %s, but %s in IoTDB.",
path, compareInfo, tsFileInfo, tsFilePath, IoTDBInfo),
TSStatusCode.VERIFY_METADATA_ERROR.getStatusCode());
}

public VerifyMetadataException(String message) {
super(message, TSStatusCode.VERIFY_METADATA_ERROR.getStatusCode());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.dataset.ShowDevicesResult;
import org.apache.iotdb.db.query.dataset.ShowTimeSeriesResult;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.utils.Pair;

import java.io.File;
Expand Down Expand Up @@ -393,6 +395,8 @@ DeviceSchemaInfo getDeviceSchemaInfoWithAutoCreate(
PartialPath devicePath,
String[] measurements,
Function<Integer, TSDataType> getDataType,
TSEncoding[] encodings,
CompressionType[] compressionTypes,
boolean aligned)
throws MetadataException;
// endregion
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1869,6 +1869,30 @@ private void internalCreateTimeseries(PartialPath path, TSDataType dataType)
Collections.emptyMap());
}

/** create timeseries ignoring PathAlreadyExistException */
private void internalCreateTimeseries(
PartialPath path, TSDataType dataType, TSEncoding encoding, CompressionType compressor)
throws MetadataException {
if (encoding == null) {
encoding = getDefaultEncoding(dataType);
}
if (compressor == null) {
compressor = TSFileDescriptor.getInstance().getConfig().getCompressor();
}
createTimeseries(path, dataType, encoding, compressor, Collections.emptyMap());
}

/** create aligned timeseries ignoring PathAlreadyExistException */
private void internalAlignedCreateTimeseries(
PartialPath prefixPath,
List<String> measurements,
List<TSDataType> dataTypes,
List<TSEncoding> encodings,
List<CompressionType> compressors)
throws MetadataException {
createAlignedTimeSeries(prefixPath, measurements, dataTypes, encodings, compressors);
}

/** create aligned timeseries ignoring PathAlreadyExistException */
private void internalAlignedCreateTimeseries(
PartialPath prefixPath, List<String> measurements, List<TSDataType> dataTypes)
Expand All @@ -1887,6 +1911,8 @@ public DeviceSchemaInfo getDeviceSchemaInfoWithAutoCreate(
PartialPath devicePath,
String[] measurements,
Function<Integer, TSDataType> getDataType,
TSEncoding[] encodings,
CompressionType[] compressionTypes,
boolean aligned)
throws MetadataException {
try {
Expand All @@ -1898,14 +1924,23 @@ public DeviceSchemaInfo getDeviceSchemaInfoWithAutoCreate(
if (measurementMNode == null) {
if (config.isAutoCreateSchemaEnabled()) {
if (aligned) {
TSDataType dataType = getDataType.apply(i);
internalAlignedCreateTimeseries(
devicePath,
Collections.singletonList(measurements[i]),
Collections.singletonList(getDataType.apply(i)));

Collections.singletonList(dataType),
Collections.singletonList(
encodings[i] == null ? getDefaultEncoding(dataType) : encodings[i]),
Collections.singletonList(
compressionTypes[i] == null
? TSFileDescriptor.getInstance().getConfig().getCompressor()
: compressionTypes[i]));
} else {
internalCreateTimeseries(
devicePath.concatNode(measurements[i]), getDataType.apply(i));
devicePath.concatNode(measurements[i]),
getDataType.apply(i),
encodings[i],
compressionTypes[i]);
}
// after creating timeseries, the deviceMNode has been replaced by a new entityMNode
deviceMNode = mtree.getNodeByPath(devicePath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1663,6 +1663,8 @@ public DeviceSchemaInfo getDeviceSchemaInfoWithAutoCreate(
PartialPath devicePath,
String[] measurements,
Function<Integer, TSDataType> getDataType,
TSEncoding[] encodings,
CompressionType[] compressionTypes,
boolean aligned)
throws MetadataException {
throw new UnsupportedOperationException();
Expand Down
Loading