Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,11 @@ public void setFailedMeasurementNumber(int failedMeasurementNumber) {
insertNode.setFailedMeasurementNumber(failedMeasurementNumber);
}

@Override
public void setFailedMeasurementsIndex(List<Integer> failedMeasurements) {
insertNode.setFailedMeasurementsIndex(failedMeasurements);
}

@Override
public int getFailedMeasurementNumber() {
return insertNode.getFailedMeasurementNumber();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;

public abstract class InsertNode extends WritePlanNode implements ComparableConsensusRequest {
Expand All @@ -60,6 +62,7 @@ public abstract class InsertNode extends WritePlanNode implements ComparableCons
protected TSDataType[] dataTypes;

protected int failedMeasurementNumber = 0;
protected List<Integer> failedMeasurementsIndex;

/**
* device id reference, for reuse device id in both id table and memtable <br>
Expand Down Expand Up @@ -95,6 +98,7 @@ protected InsertNode(
this.isAligned = isAligned;
this.measurements = measurements;
this.dataTypes = dataTypes;
this.failedMeasurementsIndex = new ArrayList<>();
}

public TRegionReplicaSet getDataRegionReplicaSet() {
Expand Down Expand Up @@ -268,6 +272,14 @@ public boolean allMeasurementFailed() {
}
return true;
}

public void setFailedMeasurementsIndex(List<Integer> failedMeasurementsIndex) {
this.failedMeasurementsIndex = failedMeasurementsIndex;
}

public List<Integer> getFailedMeasurementsIndex() {
return failedMeasurementsIndex;
}
// endregion

// region progress index
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,35 @@ public void markFailedMeasurement(int index) {
values[index] = null;
}

public InsertRowNode reconstructWithoutFailedMeasurements() {
int failedMeasurementNumber = getFailedMeasurementNumber();
if (failedMeasurementNumber == 0) {
return this;
}
String[] newMeasurements = new String[measurements.length - failedMeasurementNumber];
TSDataType[] newDataTypes = new TSDataType[dataTypes.length - failedMeasurementNumber];
Object[] newValues = new Object[values.length - failedMeasurementNumber];
int j = 0;
for (int i = 0; i < measurements.length; i++) {
if (measurements[i] != null) {
newMeasurements[j] = measurements[i];
newDataTypes[j] = dataTypes[i];
newValues[j] = values[i];
j++;
}
}
return new InsertRowNode(
id,
devicePath,
isAligned,
newMeasurements,
newDataTypes,
measurementSchemas,
time,
newValues,
isNeedInferType);
}

@Override
protected void serializeAttributes(ByteBuffer byteBuffer) {
PlanNodeType.INSERT_ROW.serialize(byteBuffer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ public List<WritePlanNode> splitByPartition(Analysis analysis) {
values,
subTimes.length);
subNode.setFailedMeasurementNumber(getFailedMeasurementNumber());
subNode.setFailedMeasurementsIndex(getFailedMeasurementsIndex());
subNode.setRange(locs);
subNode.setDataRegionReplicaSet(entry.getKey());
result.add(subNode);
Expand Down Expand Up @@ -358,6 +359,42 @@ public void markFailedMeasurement(int index) {
columns[index] = null;
}

public InsertTabletNode reconstructWithoutFailedMeasurements() {
int failedMeasurementNumber = getFailedMeasurementNumber();
if (failedMeasurementNumber == 0) {
return this;
}
String[] newMeasurements = new String[measurements.length - failedMeasurementNumber];
TSDataType[] newDataTypes = new TSDataType[dataTypes.length - failedMeasurementNumber];
MeasurementSchema[] newMeasurementSchemas =
new MeasurementSchema[measurementSchemas.length - failedMeasurementNumber];
Object[] newColumns = new Object[columns.length - failedMeasurementNumber];
BitMap[] newBitMaps = null;
if (bitMaps != null) {
newBitMaps = new BitMap[bitMaps.length - failedMeasurementNumber];
}
for (int i = 0, j = 0; i < measurements.length; i++) {
if (measurements[i] != null) {
newMeasurements[j] = measurements[i];
newDataTypes[j] = dataTypes[i];
newMeasurementSchemas[j] = measurementSchemas[i];
newColumns[j] = columns[i];
if (bitMaps != null) {
newBitMaps[j] = bitMaps[i];
}
j++;
}
}

// update the fields
this.measurements = newMeasurements;
this.dataTypes = newDataTypes;
this.measurementSchemas = newMeasurementSchemas;
this.columns = newColumns;
this.bitMaps = newBitMaps;
return this;
}

@Override
public long getMinTime() {
return times[0];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,14 +238,6 @@ public List<String> getFailedMeasurements() {
.collect(Collectors.toList());
}

public List<Exception> getFailedExceptions() {
return failedMeasurementIndex2Info == null
? Collections.emptyList()
: failedMeasurementIndex2Info.values().stream()
.map(info -> info.cause)
.collect(Collectors.toList());
}

public List<String> getFailedMessages() {
return failedMeasurementIndex2Info == null
? Collections.emptyList()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1057,7 +1057,8 @@ private boolean insertTabletToTsFileProcessor(
}

try {
tsFileProcessor.insertTablet(insertTabletNode, start, end, results);
tsFileProcessor.insertTablet(
insertTabletNode.reconstructWithoutFailedMeasurements(), start, end, results);
} catch (WriteProcessRejectException e) {
logger.warn("insert to TsFileProcessor rejected, {}", e.getMessage());
return false;
Expand Down Expand Up @@ -1119,7 +1120,7 @@ private void insertToTsFileProcessor(
return;
}
long[] costsForMetrics = new long[4];
tsFileProcessor.insert(insertRowNode, costsForMetrics);
tsFileProcessor.insert(insertRowNode.reconstructWithoutFailedMeasurements(), costsForMetrics);
PERFORMANCE_OVERVIEW_METRICS.recordCreateMemtableBlockCost(costsForMetrics[0]);
PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemoryBlockCost(costsForMetrics[1]);
PERFORMANCE_OVERVIEW_METRICS.recordScheduleWalCost(costsForMetrics[2]);
Expand Down Expand Up @@ -1182,7 +1183,8 @@ private void insertToTsFileProcessors(
}
tsFileProcessorMapForFlushing.put(tsFileProcessor, areSequence[i]);
try {
tsFileProcessor.insert(insertRowNode, costsForMetrics);
tsFileProcessor.insert(
insertRowNode.reconstructWithoutFailedMeasurements(), costsForMetrics);
} catch (WriteProcessException e) {
insertRowsNode.getResults().put(i, RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
}
Expand Down