From d587f6f8235785818686537f7b9d82529da7f875 Mon Sep 17 00:00:00 2001 From: Itami Sho <42286868+MiniSho@users.noreply.github.com> Date: Thu, 25 Apr 2024 00:14:29 +0800 Subject: [PATCH 1/3] init --- .../node/pipe/PipeEnrichedInsertNode.java | 5 +++ .../planner/plan/node/write/InsertNode.java | 12 +++++++ .../plan/node/write/InsertRowNode.java | 29 +++++++++++++++++ .../plan/node/write/InsertTabletNode.java | 32 +++++++++++++++++++ .../statement/crud/InsertBaseStatement.java | 8 ----- .../storageengine/dataregion/DataRegion.java | 8 +++-- .../constant/PipeConnectorConstant.java | 4 +++ .../connector/protocol/IoTDBConnector.java | 10 ++++++ 8 files changed, 97 insertions(+), 11 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedInsertNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedInsertNode.java index 596e771b21a48..b735397602ae4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedInsertNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedInsertNode.java @@ -270,6 +270,11 @@ public void setFailedMeasurementNumber(int failedMeasurementNumber) { insertNode.setFailedMeasurementNumber(failedMeasurementNumber); } + @Override + public void setFailedMeasurementsIndex(List failedMeasurements) { + insertNode.setFailedMeasurementsIndex(failedMeasurements); + } + @Override public int getFailedMeasurementNumber() { return insertNode.getFailedMeasurementNumber(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java index a53fe80b204a5..13487bdda2944 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java @@ -40,7 +40,9 @@ import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Arrays; +import java.util.List; import java.util.Objects; public abstract class InsertNode extends WritePlanNode implements ComparableConsensusRequest { @@ -60,6 +62,7 @@ public abstract class InsertNode extends WritePlanNode implements ComparableCons protected TSDataType[] dataTypes; protected int failedMeasurementNumber = 0; + protected List failedMeasurementsIndex; /** * device id reference, for reuse device id in both id table and memtable
@@ -95,6 +98,7 @@ protected InsertNode( this.isAligned = isAligned; this.measurements = measurements; this.dataTypes = dataTypes; + this.failedMeasurementsIndex = new ArrayList<>(); } public TRegionReplicaSet getDataRegionReplicaSet() { @@ -268,6 +272,14 @@ public boolean allMeasurementFailed() { } return true; } + + public void setFailedMeasurementsIndex(List failedMeasurementsIndex) { + this.failedMeasurementsIndex = failedMeasurementsIndex; + } + + public List getFailedMeasurementsIndex() { + return failedMeasurementsIndex; + } // endregion // region progress index diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java index cc698321d7b69..567eef1a524ad 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java @@ -211,6 +211,35 @@ public void markFailedMeasurement(int index) { values[index] = null; } + public InsertRowNode reconstructWithoutFailedMeasurements() { + int failedMeasurementNumber = getFailedMeasurementNumber(); + if (failedMeasurementNumber == 0) { + return this; + } + String[] newMeasurements = new String[measurements.length - failedMeasurementNumber]; + TSDataType[] newDataTypes = new TSDataType[dataTypes.length - failedMeasurementNumber]; + Object[] newValues = new Object[values.length - failedMeasurementNumber]; + int j = 0; + for (int i = 0; i < measurements.length; i++) { + if (measurements[i] != null) { + newMeasurements[j] = measurements[i]; + newDataTypes[j] = dataTypes[i]; + newValues[j] = values[i]; + j++; + } + } + return new InsertRowNode( + id, + devicePath, + isAligned, + newMeasurements, + newDataTypes, + measurementSchemas, + time, + newValues, + isNeedInferType); + } + @Override protected void serializeAttributes(ByteBuffer byteBuffer) { PlanNodeType.INSERT_ROW.serialize(byteBuffer); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java index 9153336afebe4..ae72c86ee9292 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java @@ -286,6 +286,7 @@ public List splitByPartition(Analysis analysis) { values, subTimes.length); subNode.setFailedMeasurementNumber(getFailedMeasurementNumber()); + subNode.setFailedMeasurementsIndex(getFailedMeasurementsIndex()); subNode.setRange(locs); subNode.setDataRegionReplicaSet(entry.getKey()); result.add(subNode); @@ -358,6 +359,37 @@ public void markFailedMeasurement(int index) { columns[index] = null; } + public InsertTabletNode reconstructWithoutFailedMeasurements() { + int failedMeasurementNumber = getFailedMeasurementNumber(); + if (failedMeasurementNumber == 0) { + return this; + } + String[] newMeasurements = new String[measurements.length - failedMeasurementNumber]; + TSDataType[] newDataTypes = new TSDataType[dataTypes.length - failedMeasurementNumber]; + MeasurementSchema[] newMeasurementSchemas = + new MeasurementSchema[measurementSchemas.length - failedMeasurementNumber]; + Object[] newColumns = new Object[columns.length - failedMeasurementNumber]; + BitMap[] newBitMaps = new BitMap[bitMaps.length - failedMeasurementNumber]; + for (int i = 0, j = 0; i < measurements.length; i++) { + if (measurements[i] != null) { + newMeasurements[j] = measurements[i]; + newDataTypes[j] = dataTypes[i]; + newMeasurementSchemas[j] = measurementSchemas[i]; + newColumns[j] = columns[i]; + newBitMaps[j] = bitMaps[i]; + j++; + } + } + + // update the fields + this.measurements = newMeasurements; + this.dataTypes = newDataTypes; + this.measurementSchemas = newMeasurementSchemas; + this.columns = newColumns; + this.bitMaps = newBitMaps; + return this; + } + @Override public long getMinTime() { return times[0]; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java index bb2d5f5d7a64b..14e874e3b6b56 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java @@ -238,14 +238,6 @@ public List getFailedMeasurements() { .collect(Collectors.toList()); } - public List getFailedExceptions() { - return failedMeasurementIndex2Info == null - ? Collections.emptyList() - : failedMeasurementIndex2Info.values().stream() - .map(info -> info.cause) - .collect(Collectors.toList()); - } - public List getFailedMessages() { return failedMeasurementIndex2Info == null ? Collections.emptyList() diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java index 6c62a289902f7..267a5062b7d87 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java @@ -1057,7 +1057,8 @@ private boolean insertTabletToTsFileProcessor( } try { - tsFileProcessor.insertTablet(insertTabletNode, start, end, results); + tsFileProcessor.insertTablet( + insertTabletNode.reconstructWithoutFailedMeasurements(), start, end, results); } catch (WriteProcessRejectException e) { logger.warn("insert to TsFileProcessor rejected, {}", e.getMessage()); return false; @@ -1119,7 +1120,7 @@ private void insertToTsFileProcessor( return; } long[] costsForMetrics = new long[4]; - tsFileProcessor.insert(insertRowNode, costsForMetrics); + tsFileProcessor.insert(insertRowNode.reconstructWithoutFailedMeasurements(), costsForMetrics); PERFORMANCE_OVERVIEW_METRICS.recordCreateMemtableBlockCost(costsForMetrics[0]); PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemoryBlockCost(costsForMetrics[1]); PERFORMANCE_OVERVIEW_METRICS.recordScheduleWalCost(costsForMetrics[2]); @@ -1182,7 +1183,8 @@ private void insertToTsFileProcessors( } tsFileProcessorMapForFlushing.put(tsFileProcessor, areSequence[i]); try { - tsFileProcessor.insert(insertRowNode, costsForMetrics); + tsFileProcessor.insert( + insertRowNode.reconstructWithoutFailedMeasurements(), costsForMetrics); } catch (WriteProcessException e) { insertRowsNode.getResults().put(i, RpcUtils.getStatus(e.getErrorCode(), e.getMessage())); } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java index bb9d075910dff..5c16a80ee2d61 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java @@ -65,6 +65,10 @@ public class PipeConnectorConstant { public static final String SINK_IOTDB_BATCH_SIZE_KEY = "sink.batch.size-bytes"; public static final long CONNECTOR_IOTDB_BATCH_SIZE_DEFAULT_VALUE = 16 * MB; + public static final String CONNECTOR_IOTDB_PARTIAL_INSERT_KEY = "connector.partial-insert"; + public static final String SINK_IOTDB_PARTIAL_INSERT_KEY = "sink.partial-insert"; + public static final boolean CONNECTOR_IOTDB_PARTIAL_INSERT_DEFAULT_VALUE = false; + public static final String CONNECTOR_IOTDB_USER_KEY = "connector.user"; public static final String SINK_IOTDB_USER_KEY = "sink.user"; public static final String CONNECTOR_IOTDB_USER_DEFAULT_VALUE = "root"; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java index 65b32a4839692..a563dd528631e 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java @@ -53,6 +53,8 @@ import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_HOST_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_IP_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_NODE_URLS_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PARTIAL_INSERT_DEFAULT_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PARTIAL_INSERT_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PORT_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_ROUND_ROBIN_STRATEGY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_STRATEGY_KEY; @@ -66,6 +68,7 @@ import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_HOST_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_IP_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_NODE_URLS_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_PARTIAL_INSERT_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_PORT_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_LOAD_BALANCE_STRATEGY_KEY; @@ -84,6 +87,8 @@ public abstract class IoTDBConnector implements PipeConnector { protected boolean isTabletBatchModeEnabled = true; + protected boolean isPartialInsertEnabled = false; + protected PipeReceiverStatusHandler receiverStatusHandler; @Override @@ -156,6 +161,11 @@ public void customize(PipeParameters parameters, PipeConnectorRuntimeConfigurati CONNECTOR_IOTDB_BATCH_MODE_ENABLE_DEFAULT_VALUE); LOGGER.info("IoTDBConnector isTabletBatchModeEnabled: {}", isTabletBatchModeEnabled); + isPartialInsertEnabled = + parameters.getBooleanOrDefault( + Arrays.asList(CONNECTOR_IOTDB_PARTIAL_INSERT_KEY, SINK_IOTDB_PARTIAL_INSERT_KEY), + CONNECTOR_IOTDB_PARTIAL_INSERT_DEFAULT_VALUE); + receiverStatusHandler = new PipeReceiverStatusHandler( parameters From 93d690f1b451cb690233257a68c056fab529fdd4 Mon Sep 17 00:00:00 2001 From: Itami Sho <42286868+MiniSho@users.noreply.github.com> Date: Thu, 25 Apr 2024 11:25:12 +0800 Subject: [PATCH 2/3] fix --- .../plan/planner/plan/node/write/InsertTabletNode.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java index ae72c86ee9292..1c542086c6d6c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java @@ -369,14 +369,19 @@ public InsertTabletNode reconstructWithoutFailedMeasurements() { MeasurementSchema[] newMeasurementSchemas = new MeasurementSchema[measurementSchemas.length - failedMeasurementNumber]; Object[] newColumns = new Object[columns.length - failedMeasurementNumber]; - BitMap[] newBitMaps = new BitMap[bitMaps.length - failedMeasurementNumber]; + BitMap[] newBitMaps = null; + if (bitMaps != null) { + newBitMaps = new BitMap[bitMaps.length - failedMeasurementNumber]; + } for (int i = 0, j = 0; i < measurements.length; i++) { if (measurements[i] != null) { newMeasurements[j] = measurements[i]; newDataTypes[j] = dataTypes[i]; newMeasurementSchemas[j] = measurementSchemas[i]; newColumns[j] = columns[i]; - newBitMaps[j] = bitMaps[i]; + if (bitMaps != null) { + newBitMaps[j] = bitMaps[i]; + } j++; } } From 3c444b0878ea865fdcb0a9818b420e10bac2b2a2 Mon Sep 17 00:00:00 2001 From: Steve Yurong Su Date: Fri, 26 Apr 2024 15:01:19 +0800 Subject: [PATCH 3/3] remove necessary --- .../pipe/config/constant/PipeConnectorConstant.java | 4 ---- .../pipe/connector/protocol/IoTDBConnector.java | 10 ---------- 2 files changed, 14 deletions(-) diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java index 5c16a80ee2d61..bb9d075910dff 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java @@ -65,10 +65,6 @@ public class PipeConnectorConstant { public static final String SINK_IOTDB_BATCH_SIZE_KEY = "sink.batch.size-bytes"; public static final long CONNECTOR_IOTDB_BATCH_SIZE_DEFAULT_VALUE = 16 * MB; - public static final String CONNECTOR_IOTDB_PARTIAL_INSERT_KEY = "connector.partial-insert"; - public static final String SINK_IOTDB_PARTIAL_INSERT_KEY = "sink.partial-insert"; - public static final boolean CONNECTOR_IOTDB_PARTIAL_INSERT_DEFAULT_VALUE = false; - public static final String CONNECTOR_IOTDB_USER_KEY = "connector.user"; public static final String SINK_IOTDB_USER_KEY = "sink.user"; public static final String CONNECTOR_IOTDB_USER_DEFAULT_VALUE = "root"; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java index a563dd528631e..65b32a4839692 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java @@ -53,8 +53,6 @@ import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_HOST_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_IP_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_NODE_URLS_KEY; -import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PARTIAL_INSERT_DEFAULT_VALUE; -import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PARTIAL_INSERT_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PORT_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_ROUND_ROBIN_STRATEGY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_STRATEGY_KEY; @@ -68,7 +66,6 @@ import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_HOST_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_IP_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_NODE_URLS_KEY; -import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_PARTIAL_INSERT_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_PORT_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_LOAD_BALANCE_STRATEGY_KEY; @@ -87,8 +84,6 @@ public abstract class IoTDBConnector implements PipeConnector { protected boolean isTabletBatchModeEnabled = true; - protected boolean isPartialInsertEnabled = false; - protected PipeReceiverStatusHandler receiverStatusHandler; @Override @@ -161,11 +156,6 @@ public void customize(PipeParameters parameters, PipeConnectorRuntimeConfigurati CONNECTOR_IOTDB_BATCH_MODE_ENABLE_DEFAULT_VALUE); LOGGER.info("IoTDBConnector isTabletBatchModeEnabled: {}", isTabletBatchModeEnabled); - isPartialInsertEnabled = - parameters.getBooleanOrDefault( - Arrays.asList(CONNECTOR_IOTDB_PARTIAL_INSERT_KEY, SINK_IOTDB_PARTIAL_INSERT_KEY), - CONNECTOR_IOTDB_PARTIAL_INSERT_DEFAULT_VALUE); - receiverStatusHandler = new PipeReceiverStatusHandler( parameters