Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -462,53 +462,10 @@ public RegionExecutionResult visitInternalCreateTimeSeries(
}
measurementGroup.removeMeasurements(failingMeasurementMap.keySet());

RegionExecutionResult executionResult =
super.visitInternalCreateTimeSeries(node, context);

if (failingStatus.isEmpty() && alreadyExistingStatus.isEmpty()) {
return executionResult;
}

TSStatus executionStatus = executionResult.getStatus();

// separate the measurement_already_exist exception and other exceptions process,
// measurement_already_exist exception is acceptable due to concurrent timeseries creation
if (failingStatus.isEmpty()) {
if (executionStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
if (executionStatus.getSubStatus().get(0).getCode()
== TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) {
// there's only measurement_already_exist exception
alreadyExistingStatus.addAll(executionStatus.getSubStatus());
} else {
failingStatus.addAll(executionStatus.getSubStatus());
}
} else if (executionStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
failingStatus.add(executionStatus);
}
} else {
if (executionStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
if (executionStatus.getSubStatus().get(0).getCode()
!= TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) {
failingStatus.addAll(executionStatus.getSubStatus());
}
} else if (executionStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
failingStatus.add(executionStatus);
}
}

RegionExecutionResult result = new RegionExecutionResult();
TSStatus status;
if (failingStatus.isEmpty()) {
status = RpcUtils.getStatus(alreadyExistingStatus);
result.setAccepted(true);
} else {
status = RpcUtils.getStatus(failingStatus);
result.setAccepted(false);
}

result.setMessage(status.getMessage());
result.setStatus(status);
return result;
return processExecutionResultOfInternalCreateSchema(
super.visitInternalCreateTimeSeries(node, context),
failingStatus,
alreadyExistingStatus);
} finally {
context.getRegionWriteValidationRWLock().writeLock().unlock();
}
Expand Down Expand Up @@ -562,59 +519,65 @@ public RegionExecutionResult visitInternalCreateMultiTimeSeries(
measurementGroup.removeMeasurements(failingMeasurementMap.keySet());
}

RegionExecutionResult executionResult =
super.visitInternalCreateMultiTimeSeries(node, context);

if (failingStatus.isEmpty() && alreadyExistingStatus.isEmpty()) {
return executionResult;
}

TSStatus executionStatus = executionResult.getStatus();
return processExecutionResultOfInternalCreateSchema(
super.visitInternalCreateMultiTimeSeries(node, context),
failingStatus,
alreadyExistingStatus);
} finally {
context.getRegionWriteValidationRWLock().writeLock().unlock();
}
} else {
return super.visitInternalCreateMultiTimeSeries(node, context);
}
}

// separate the measurement_already_exist exception and other exceptions process,
// measurement_already_exist exception is acceptable due to concurrent timeseries creation
if (failingStatus.isEmpty()) {
if (executionStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
if (executionStatus.getSubStatus().get(0).getCode()
== TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) {
// there's only measurement_already_exist exception
alreadyExistingStatus.addAll(executionStatus.getSubStatus());
} else {
failingStatus.addAll(executionStatus.getSubStatus());
}
} else if (executionStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
failingStatus.add(executionStatus);
}
private RegionExecutionResult processExecutionResultOfInternalCreateSchema(
RegionExecutionResult executionResult,
List<TSStatus> failingStatus,
List<TSStatus> alreadyExistingStatus) {
TSStatus executionStatus = executionResult.getStatus();

// separate the measurement_already_exist exception and other exceptions process,
// measurement_already_exist exception is acceptable due to concurrent timeseries creation
if (failingStatus.isEmpty()) {
if (executionStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
if (executionStatus.getSubStatus().get(0).getCode()
== TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) {
// there's only measurement_already_exist exception
alreadyExistingStatus.addAll(executionStatus.getSubStatus());
} else {
if (executionStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
if (executionStatus.getSubStatus().get(0).getCode()
!= TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) {
failingStatus.addAll(executionStatus.getSubStatus());
}
} else if (executionStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
failingStatus.add(executionStatus);
}
failingStatus.addAll(executionStatus.getSubStatus());
}

RegionExecutionResult result = new RegionExecutionResult();
TSStatus status;
if (failingStatus.isEmpty()) {
status = RpcUtils.getStatus(alreadyExistingStatus);
result.setAccepted(true);
} else {
status = RpcUtils.getStatus(failingStatus);
result.setAccepted(false);
} else if (executionStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
failingStatus.add(executionStatus);
}
} else {
if (executionStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
if (executionStatus.getSubStatus().get(0).getCode()
!= TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) {
failingStatus.addAll(executionStatus.getSubStatus());
}

result.setMessage(status.getMessage());
result.setStatus(status);
return result;
} finally {
context.getRegionWriteValidationRWLock().writeLock().unlock();
} else if (executionStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
failingStatus.add(executionStatus);
}
}

RegionExecutionResult result = new RegionExecutionResult();
TSStatus status;
if (failingStatus.isEmpty() && alreadyExistingStatus.isEmpty()) {
status = RpcUtils.SUCCESS_STATUS;
result.setAccepted(true);
} else if (failingStatus.isEmpty()) {
status = RpcUtils.getStatus(alreadyExistingStatus);
result.setAccepted(true);
} else {
return super.visitInternalCreateMultiTimeSeries(node, context);
status = RpcUtils.getStatus(failingStatus);
result.setAccepted(false);
}

result.setMessage(status.getMessage());
result.setStatus(status);
return result;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,10 @@ public CreateMultiTimeSeriesNode(
measurementGroupMap.put(devicePath, measurementGroup);
}

measurementGroup.addMeasurement(
paths.get(i).getMeasurement(), dataTypes.get(i), encodings.get(i), compressors.get(i));
if (!measurementGroup.addMeasurement(
paths.get(i).getMeasurement(), dataTypes.get(i), encodings.get(i), compressors.get(i))) {
continue;
}

if (propsList != null) {
measurementGroup.addProps(propsList.get(i));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand All @@ -44,6 +45,8 @@ public class MeasurementGroup {
private List<Map<String, String>> tagsList;
private List<Map<String, String>> attributesList;

private final transient Set<String> measurementSet = new HashSet<>();

public List<String> getMeasurements() {
return measurements;
}
Expand Down Expand Up @@ -80,15 +83,20 @@ public List<Map<String, String>> getAttributesList() {
return attributesList;
}

public void addMeasurement(
public boolean addMeasurement(
String measurement,
TSDataType dataType,
TSEncoding encoding,
CompressionType compressionType) {
if (measurementSet.contains(measurement)) {
return false;
}
measurements.add(measurement);
measurementSet.add(measurement);
dataTypes.add(dataType);
encodings.add(encoding);
compressors.add(compressionType);
return true;
}

public void addAlias(String alias) {
Expand Down Expand Up @@ -119,29 +127,6 @@ public void addAttributes(Map<String, String> attributes) {
attributesList.add(attributes);
}

public void removeMeasurement(int index) {
measurements.remove(index);
dataTypes.remove(index);
encodings.remove(index);
compressors.remove(index);

if (aliasList != null) {
aliasList.remove(index);
}

if (propsList != null) {
propsList.remove(index);
}

if (tagsList != null) {
tagsList.remove(index);
}

if (attributesList != null) {
attributesList.remove(index);
}
}

public void removeMeasurements(Set<Integer> indexSet) {
int restSize = this.measurements.size() - indexSet.size();
List<String> measurements = new ArrayList<>(restSize);
Expand All @@ -156,6 +141,7 @@ public void removeMeasurements(Set<Integer> indexSet) {

for (int i = 0; i < this.measurements.size(); i++) {
if (indexSet.contains(i)) {
measurementSet.remove(this.measurements.get(i));
continue;
}
measurements.add(this.measurements.get(i));
Expand Down Expand Up @@ -217,6 +203,7 @@ private MeasurementGroup getSubMeasurementGroup(int startIndex, int endIndex) {
MeasurementGroup subMeasurementGroup;
subMeasurementGroup = new MeasurementGroup();
subMeasurementGroup.measurements = measurements.subList(startIndex, endIndex);
subMeasurementGroup.measurementSet.addAll(subMeasurementGroup.measurements);
subMeasurementGroup.dataTypes = dataTypes.subList(startIndex, endIndex);
subMeasurementGroup.encodings = encodings.subList(startIndex, endIndex);
subMeasurementGroup.compressors = compressors.subList(startIndex, endIndex);
Expand Down Expand Up @@ -359,6 +346,7 @@ public void deserialize(ByteBuffer byteBuffer) {
for (int i = 0; i < size; i++) {
measurements.add(ReadWriteIOUtils.readString(byteBuffer));
}
measurementSet.addAll(measurements);

dataTypes = new ArrayList<>();
for (int i = 0; i < size; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.iotdb.db.mpp.plan.scheduler;

import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
Expand All @@ -32,6 +33,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -87,6 +89,38 @@ public void waitUntilCompleted() throws InterruptedException {
}
}

public List<TSStatus> getFailureStatusList() {
List<TSStatus> failureStatusList = new ArrayList<>();
TSStatus status;
for (Map.Entry<Integer, TSendPlanNodeResp> entry : instanceId2RespMap.entrySet()) {
status = entry.getValue().getStatus();
if (!entry.getValue().accepted) {
if (status == null) {
logger.warn(
"dispatch write failed. message: {}, node {}",
entry.getValue().message,
instances.get(entry.getKey()).getHostDataNode().getInternalEndPoint());
failureStatusList.add(
RpcUtils.getStatus(TSStatusCode.WRITE_PROCESS_ERROR, entry.getValue().getMessage()));
} else {
logger.warn(
"dispatch write failed. status: {}, code: {}, message: {}, node {}",
entry.getValue().status,
TSStatusCode.representOf(status.code),
entry.getValue().message,
instances.get(entry.getKey()).getHostDataNode().getInternalEndPoint());
failureStatusList.add(status);
}
} else {
// some expected and accepted status except SUCCESS_STATUS need to be returned
if (status != null && status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
failureStatusList.add(status);
}
}
}
return failureStatusList;
}

public Future<FragInstanceDispatchResult> getResult() {
for (Map.Entry<Integer, TSendPlanNodeResp> entry : instanceId2RespMap.entrySet()) {
if (!entry.getValue().accepted) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,19 +177,21 @@ private Future<FragInstanceDispatchResult> dispatchWriteAsync(List<FragmentInsta
AsyncPlanNodeSender asyncPlanNodeSender =
new AsyncPlanNodeSender(asyncInternalServiceClientManager, remoteInstances);
asyncPlanNodeSender.sendAll();

List<TSStatus> dataNodeFailureList = new ArrayList<>();

// sync dispatch to local
long localScheduleStartTime = System.nanoTime();
for (FragmentInstance localInstance : localInstances) {
try (SetThreadName threadName = new SetThreadName(localInstance.getId().getFullId())) {
dispatchOneInstance(localInstance);
} catch (FragmentInstanceDispatchException e) {
return immediateFuture(new FragInstanceDispatchResult(e.getFailureStatus()));
dataNodeFailureList.add(e.getFailureStatus());
} catch (Throwable t) {
logger.warn("[DispatchFailed]", t);
return immediateFuture(
new FragInstanceDispatchResult(
RpcUtils.getStatus(
TSStatusCode.INTERNAL_SERVER_ERROR, "Unexpected errors: " + t.getMessage())));
dataNodeFailureList.add(
RpcUtils.getStatus(
TSStatusCode.INTERNAL_SERVER_ERROR, "Unexpected errors: " + t.getMessage()));
}
}
PerformanceOverviewMetricsManager.recordScheduleLocalCost(
Expand All @@ -205,7 +207,25 @@ private Future<FragInstanceDispatchResult> dispatchWriteAsync(List<FragmentInsta
RpcUtils.getStatus(
TSStatusCode.INTERNAL_SERVER_ERROR, "Interrupted errors: " + e.getMessage())));
}
return asyncPlanNodeSender.getResult();

dataNodeFailureList.addAll(asyncPlanNodeSender.getFailureStatusList());

if (dataNodeFailureList.isEmpty()) {
return immediateFuture(new FragInstanceDispatchResult(true));
}
if (instances.size() == 1) {
return immediateFuture(new FragInstanceDispatchResult(dataNodeFailureList.get(0)));
} else {
List<TSStatus> failureStatusList = new ArrayList<>();
for (TSStatus dataNodeFailure : dataNodeFailureList) {
if (dataNodeFailure.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
failureStatusList.addAll(dataNodeFailure.getSubStatus());
} else {
failureStatusList.add(dataNodeFailure);
}
}
return immediateFuture(new FragInstanceDispatchResult(RpcUtils.getStatus(failureStatusList)));
}
}

private void dispatchOneInstance(FragmentInstance instance)
Expand Down