From 41446967cfa3a3e1c1f82ffd78d021484299b34f Mon Sep 17 00:00:00 2001 From: MarcosZyk <1534661820@qq.com> Date: Tue, 14 Mar 2023 16:37:33 +0800 Subject: [PATCH 1/5] fix dispatch result collection --- .../plan/scheduler/AsyncPlanNodeSender.java | 38 +++++++++++++++++++ .../FragmentInstanceDispatcherImpl.java | 29 +++++++++++--- 2 files changed, 61 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AsyncPlanNodeSender.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AsyncPlanNodeSender.java index f19de2323f098..0d4a84edad786 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AsyncPlanNodeSender.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AsyncPlanNodeSender.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.mpp.plan.scheduler; import org.apache.iotdb.common.rpc.thrift.TEndPoint; +import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.client.IClientManager; import org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient; import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance; @@ -32,6 +33,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -87,6 +89,42 @@ public void waitUntilCompleted() throws InterruptedException { } } + public List getFailureStatusList() { + List failureStatusList = new ArrayList<>(); + TSStatus status; + for (Map.Entry entry : instanceId2RespMap.entrySet()) { + status = entry.getValue().getStatus(); + if (!entry.getValue().accepted) { + if (status == null) { + logger.warn( + "dispatch write failed. message: {}, node {}", + entry.getValue().message, + instances.get(entry.getKey()).getHostDataNode().getInternalEndPoint()); + failureStatusList.add( + RpcUtils.getStatus(TSStatusCode.WRITE_PROCESS_ERROR, entry.getValue().getMessage())); + } else { + logger.warn( + "dispatch write failed. status: {}, code: {}, message: {}, node {}", + entry.getValue().status, + TSStatusCode.representOf(status.code), + entry.getValue().message, + instances.get(entry.getKey()).getHostDataNode().getInternalEndPoint()); + if (status.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) { + failureStatusList.addAll(status.getSubStatus()); + } else { + failureStatusList.add(status); + } + } + } else { + // some expected and accepted status except SUCCESS_STATUS need to be returned + if (status != null && status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + failureStatusList.add(status); + } + } + } + return failureStatusList; + } + public Future getResult() { for (Map.Entry entry : instanceId2RespMap.entrySet()) { if (!entry.getValue().accepted) { diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java index 3226884b56985..357afc685d36e 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java @@ -177,19 +177,26 @@ private Future dispatchWriteAsync(List failureStatusList = new ArrayList<>(); + // sync dispatch to local long localScheduleStartTime = System.nanoTime(); for (FragmentInstance localInstance : localInstances) { try (SetThreadName threadName = new SetThreadName(localInstance.getId().getFullId())) { dispatchOneInstance(localInstance); } catch (FragmentInstanceDispatchException e) { - return immediateFuture(new FragInstanceDispatchResult(e.getFailureStatus())); + TSStatus failureStatus = e.getFailureStatus(); + if (failureStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) { + failureStatusList.addAll(failureStatus.getSubStatus()); + } else { + failureStatusList.add(failureStatus); + } } catch (Throwable t) { logger.warn("[DispatchFailed]", t); - return immediateFuture( - new FragInstanceDispatchResult( - RpcUtils.getStatus( - TSStatusCode.INTERNAL_SERVER_ERROR, "Unexpected errors: " + t.getMessage()))); + failureStatusList.add( + RpcUtils.getStatus( + TSStatusCode.INTERNAL_SERVER_ERROR, "Unexpected errors: " + t.getMessage())); } } PerformanceOverviewMetricsManager.recordScheduleLocalCost( @@ -205,7 +212,17 @@ private Future dispatchWriteAsync(List Date: Wed, 15 Mar 2023 09:24:11 +0800 Subject: [PATCH 2/5] fix dispatch result collection --- .../iotdb/db/mpp/plan/scheduler/AsyncPlanNodeSender.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AsyncPlanNodeSender.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AsyncPlanNodeSender.java index 0d4a84edad786..9a3b92b45ca36 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AsyncPlanNodeSender.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AsyncPlanNodeSender.java @@ -118,7 +118,11 @@ public List getFailureStatusList() { } else { // some expected and accepted status except SUCCESS_STATUS need to be returned if (status != null && status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - failureStatusList.add(status); + if (status.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) { + failureStatusList.addAll(status.getSubStatus()); + } else { + failureStatusList.add(status); + } } } } From 4f94443960e3a94295fae6704e8395e06203e290 Mon Sep 17 00:00:00 2001 From: MarcosZyk <1534661820@qq.com> Date: Wed, 15 Mar 2023 11:18:17 +0800 Subject: [PATCH 3/5] fix MeasurementGroup bug --- .../write/CreateMultiTimeSeriesNode.java | 6 ++-- .../node/metedata/write/MeasurementGroup.java | 36 +++++++------------ 2 files changed, 16 insertions(+), 26 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/CreateMultiTimeSeriesNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/CreateMultiTimeSeriesNode.java index a739020e3083e..32445aee12828 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/CreateMultiTimeSeriesNode.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/CreateMultiTimeSeriesNode.java @@ -83,8 +83,10 @@ public CreateMultiTimeSeriesNode( measurementGroupMap.put(devicePath, measurementGroup); } - measurementGroup.addMeasurement( - paths.get(i).getMeasurement(), dataTypes.get(i), encodings.get(i), compressors.get(i)); + if (!measurementGroup.addMeasurement( + paths.get(i).getMeasurement(), dataTypes.get(i), encodings.get(i), compressors.get(i))) { + continue; + } if (propsList != null) { measurementGroup.addProps(propsList.get(i)); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/MeasurementGroup.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/MeasurementGroup.java index f5bd2ccc52399..2c118f01f397c 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/MeasurementGroup.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/MeasurementGroup.java @@ -28,6 +28,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; @@ -44,6 +45,8 @@ public class MeasurementGroup { private List> tagsList; private List> attributesList; + private final transient Set measurementSet = new HashSet<>(); + public List getMeasurements() { return measurements; } @@ -80,15 +83,20 @@ public List> getAttributesList() { return attributesList; } - public void addMeasurement( + public boolean addMeasurement( String measurement, TSDataType dataType, TSEncoding encoding, CompressionType compressionType) { + if (measurementSet.contains(measurement)) { + return false; + } measurements.add(measurement); + measurementSet.add(measurement); dataTypes.add(dataType); encodings.add(encoding); compressors.add(compressionType); + return true; } public void addAlias(String alias) { @@ -119,29 +127,6 @@ public void addAttributes(Map attributes) { attributesList.add(attributes); } - public void removeMeasurement(int index) { - measurements.remove(index); - dataTypes.remove(index); - encodings.remove(index); - compressors.remove(index); - - if (aliasList != null) { - aliasList.remove(index); - } - - if (propsList != null) { - propsList.remove(index); - } - - if (tagsList != null) { - tagsList.remove(index); - } - - if (attributesList != null) { - attributesList.remove(index); - } - } - public void removeMeasurements(Set indexSet) { int restSize = this.measurements.size() - indexSet.size(); List measurements = new ArrayList<>(restSize); @@ -156,6 +141,7 @@ public void removeMeasurements(Set indexSet) { for (int i = 0; i < this.measurements.size(); i++) { if (indexSet.contains(i)) { + measurementSet.remove(measurements.get(i)); continue; } measurements.add(this.measurements.get(i)); @@ -217,6 +203,7 @@ private MeasurementGroup getSubMeasurementGroup(int startIndex, int endIndex) { MeasurementGroup subMeasurementGroup; subMeasurementGroup = new MeasurementGroup(); subMeasurementGroup.measurements = measurements.subList(startIndex, endIndex); + subMeasurementGroup.measurementSet.addAll(subMeasurementGroup.measurements); subMeasurementGroup.dataTypes = dataTypes.subList(startIndex, endIndex); subMeasurementGroup.encodings = encodings.subList(startIndex, endIndex); subMeasurementGroup.compressors = compressors.subList(startIndex, endIndex); @@ -359,6 +346,7 @@ public void deserialize(ByteBuffer byteBuffer) { for (int i = 0; i < size; i++) { measurements.add(ReadWriteIOUtils.readString(byteBuffer)); } + measurementSet.addAll(measurements); dataTypes = new ArrayList<>(); for (int i = 0; i < size; i++) { From 58334e7424d991c635cd2e742503e23171c57fef Mon Sep 17 00:00:00 2001 From: MarcosZyk <1534661820@qq.com> Date: Wed, 15 Mar 2023 15:14:15 +0800 Subject: [PATCH 4/5] fix execution result process --- .../executor/RegionWriteExecutor.java | 149 +++++++----------- 1 file changed, 56 insertions(+), 93 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java index 71e870cc67031..69ca7a4f3b158 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java @@ -462,53 +462,10 @@ public RegionExecutionResult visitInternalCreateTimeSeries( } measurementGroup.removeMeasurements(failingMeasurementMap.keySet()); - RegionExecutionResult executionResult = - super.visitInternalCreateTimeSeries(node, context); - - if (failingStatus.isEmpty() && alreadyExistingStatus.isEmpty()) { - return executionResult; - } - - TSStatus executionStatus = executionResult.getStatus(); - - // separate the measurement_already_exist exception and other exceptions process, - // measurement_already_exist exception is acceptable due to concurrent timeseries creation - if (failingStatus.isEmpty()) { - if (executionStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) { - if (executionStatus.getSubStatus().get(0).getCode() - == TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) { - // there's only measurement_already_exist exception - alreadyExistingStatus.addAll(executionStatus.getSubStatus()); - } else { - failingStatus.addAll(executionStatus.getSubStatus()); - } - } else if (executionStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - failingStatus.add(executionStatus); - } - } else { - if (executionStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) { - if (executionStatus.getSubStatus().get(0).getCode() - != TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) { - failingStatus.addAll(executionStatus.getSubStatus()); - } - } else if (executionStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - failingStatus.add(executionStatus); - } - } - - RegionExecutionResult result = new RegionExecutionResult(); - TSStatus status; - if (failingStatus.isEmpty()) { - status = RpcUtils.getStatus(alreadyExistingStatus); - result.setAccepted(true); - } else { - status = RpcUtils.getStatus(failingStatus); - result.setAccepted(false); - } - - result.setMessage(status.getMessage()); - result.setStatus(status); - return result; + return processExecutionResultOfInternalCreateSchema( + super.visitInternalCreateTimeSeries(node, context), + failingStatus, + alreadyExistingStatus); } finally { context.getRegionWriteValidationRWLock().writeLock().unlock(); } @@ -562,59 +519,65 @@ public RegionExecutionResult visitInternalCreateMultiTimeSeries( measurementGroup.removeMeasurements(failingMeasurementMap.keySet()); } - RegionExecutionResult executionResult = - super.visitInternalCreateMultiTimeSeries(node, context); - - if (failingStatus.isEmpty() && alreadyExistingStatus.isEmpty()) { - return executionResult; - } - - TSStatus executionStatus = executionResult.getStatus(); + return processExecutionResultOfInternalCreateSchema( + super.visitInternalCreateMultiTimeSeries(node, context), + failingStatus, + alreadyExistingStatus); + } finally { + context.getRegionWriteValidationRWLock().writeLock().unlock(); + } + } else { + return super.visitInternalCreateMultiTimeSeries(node, context); + } + } - // separate the measurement_already_exist exception and other exceptions process, - // measurement_already_exist exception is acceptable due to concurrent timeseries creation - if (failingStatus.isEmpty()) { - if (executionStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) { - if (executionStatus.getSubStatus().get(0).getCode() - == TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) { - // there's only measurement_already_exist exception - alreadyExistingStatus.addAll(executionStatus.getSubStatus()); - } else { - failingStatus.addAll(executionStatus.getSubStatus()); - } - } else if (executionStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - failingStatus.add(executionStatus); - } + private RegionExecutionResult processExecutionResultOfInternalCreateSchema( + RegionExecutionResult executionResult, + List failingStatus, + List alreadyExistingStatus) { + TSStatus executionStatus = executionResult.getStatus(); + + // separate the measurement_already_exist exception and other exceptions process, + // measurement_already_exist exception is acceptable due to concurrent timeseries creation + if (failingStatus.isEmpty()) { + if (executionStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) { + if (executionStatus.getSubStatus().get(0).getCode() + == TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) { + // there's only measurement_already_exist exception + alreadyExistingStatus.addAll(executionStatus.getSubStatus()); } else { - if (executionStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) { - if (executionStatus.getSubStatus().get(0).getCode() - != TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) { - failingStatus.addAll(executionStatus.getSubStatus()); - } - } else if (executionStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - failingStatus.add(executionStatus); - } + failingStatus.addAll(executionStatus.getSubStatus()); } - - RegionExecutionResult result = new RegionExecutionResult(); - TSStatus status; - if (failingStatus.isEmpty()) { - status = RpcUtils.getStatus(alreadyExistingStatus); - result.setAccepted(true); - } else { - status = RpcUtils.getStatus(failingStatus); - result.setAccepted(false); + } else if (executionStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + failingStatus.add(executionStatus); + } + } else { + if (executionStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) { + if (executionStatus.getSubStatus().get(0).getCode() + != TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) { + failingStatus.addAll(executionStatus.getSubStatus()); } - - result.setMessage(status.getMessage()); - result.setStatus(status); - return result; - } finally { - context.getRegionWriteValidationRWLock().writeLock().unlock(); + } else if (executionStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + failingStatus.add(executionStatus); } + } + + RegionExecutionResult result = new RegionExecutionResult(); + TSStatus status; + if (failingStatus.isEmpty() && alreadyExistingStatus.isEmpty()) { + status = RpcUtils.SUCCESS_STATUS; + result.setAccepted(true); + } else if (failingStatus.isEmpty()) { + status = RpcUtils.getStatus(alreadyExistingStatus); + result.setAccepted(true); } else { - return super.visitInternalCreateMultiTimeSeries(node, context); + status = RpcUtils.getStatus(failingStatus); + result.setAccepted(false); } + + result.setMessage(status.getMessage()); + result.setStatus(status); + return result; } @Override From 36411c20eb8afaea39ab3d5e172a6192f3ccb3fe Mon Sep 17 00:00:00 2001 From: MarcosZyk <1534661820@qq.com> Date: Wed, 15 Mar 2023 22:55:32 +0800 Subject: [PATCH 5/5] fix execution result process --- .../node/metedata/write/MeasurementGroup.java | 2 +- .../plan/scheduler/AsyncPlanNodeSender.java | 12 ++------- .../FragmentInstanceDispatcherImpl.java | 27 ++++++++++--------- 3 files changed, 18 insertions(+), 23 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/MeasurementGroup.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/MeasurementGroup.java index 2c118f01f397c..5123f0ba0b6b7 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/MeasurementGroup.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/MeasurementGroup.java @@ -141,7 +141,7 @@ public void removeMeasurements(Set indexSet) { for (int i = 0; i < this.measurements.size(); i++) { if (indexSet.contains(i)) { - measurementSet.remove(measurements.get(i)); + measurementSet.remove(this.measurements.get(i)); continue; } measurements.add(this.measurements.get(i)); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AsyncPlanNodeSender.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AsyncPlanNodeSender.java index 9a3b92b45ca36..29a5f80f62885 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AsyncPlanNodeSender.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AsyncPlanNodeSender.java @@ -109,20 +109,12 @@ public List getFailureStatusList() { TSStatusCode.representOf(status.code), entry.getValue().message, instances.get(entry.getKey()).getHostDataNode().getInternalEndPoint()); - if (status.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) { - failureStatusList.addAll(status.getSubStatus()); - } else { - failureStatusList.add(status); - } + failureStatusList.add(status); } } else { // some expected and accepted status except SUCCESS_STATUS need to be returned if (status != null && status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - if (status.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) { - failureStatusList.addAll(status.getSubStatus()); - } else { - failureStatusList.add(status); - } + failureStatusList.add(status); } } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java index 357afc685d36e..0719b86452d9d 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java @@ -178,7 +178,7 @@ private Future dispatchWriteAsync(List failureStatusList = new ArrayList<>(); + List dataNodeFailureList = new ArrayList<>(); // sync dispatch to local long localScheduleStartTime = System.nanoTime(); @@ -186,15 +186,10 @@ private Future dispatchWriteAsync(List dispatchWriteAsync(List failureStatusList = new ArrayList<>(); + for (TSStatus dataNodeFailure : dataNodeFailureList) { + if (dataNodeFailure.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) { + failureStatusList.addAll(dataNodeFailure.getSubStatus()); + } else { + failureStatusList.add(dataNodeFailure); + } + } return immediateFuture(new FragInstanceDispatchResult(RpcUtils.getStatus(failureStatusList))); } }