Skip to content
Permalink
Browse files
[IOTDB-3196] Add search index in InsertNode (#5945)
* add search index in insert node

* add index

* fix ci
  • Loading branch information
HeimingZ committed May 18, 2022
1 parent f78e90f commit 3e63945619e29bf19e45ec35752a13f6ad6ab4a2
Showing 7 changed files with 67 additions and 2 deletions.
@@ -116,6 +116,16 @@ public void addInsertTabletNode(InsertTabletNode node, Integer parentIndex) {
parentInsertTabletNodeIndexList.add(parentIndex);
}

@Override
public void setSearchIndex(long index) {
insertTabletNodeList.forEach(plan -> plan.setSearchIndex(index));
}

@Override
public void setSafelyDeletedSearchIndex(long index) {
insertTabletNodeList.forEach(plan -> plan.setSafelyDeletedSearchIndex(index));
}

@Override
public boolean validateAndSetSchema(SchemaTree schemaTree) {
for (InsertTabletNode insertTabletNode : insertTabletNodeList) {
@@ -44,6 +44,10 @@
import java.util.stream.Collectors;

public abstract class InsertNode extends WritePlanNode implements IConsensusRequest {
/** this insert node doesn't need to participate in multi-leader consensus */
public static final long NO_CONSENSUS_INDEX = -1;
/** no multi-leader consensus, all insert nodes can be safely deleted */
public static final long DEFAULT_SAFELY_DELETED_SEARCH_INDEX = Long.MAX_VALUE;

/**
* if use id table, this filed is id form of device path <br>
@@ -67,6 +71,14 @@ public abstract class InsertNode extends WritePlanNode implements IConsensusRequ
*/
protected IDeviceID deviceID;

/** this index is used by wal search, its order should be protected by the upper layer */
protected long searchIndex = NO_CONSENSUS_INDEX;
/**
* this index pass info to wal, indicating that insert nodes whose search index are before this
* value can be deleted safely
*/
protected long safelyDeletedSearchIndex = DEFAULT_SAFELY_DELETED_SEARCH_INDEX;

/** Physical address of data region after splitting */
TRegionReplicaSet dataRegionReplicaSet;

@@ -139,6 +151,22 @@ public void setDeviceID(IDeviceID deviceID) {
this.deviceID = deviceID;
}

public long getSearchIndex() {
return searchIndex;
}

public void setSearchIndex(long searchIndex) {
this.searchIndex = searchIndex;
}

public long getSafelyDeletedSearchIndex() {
return safelyDeletedSearchIndex;
}

public void setSafelyDeletedSearchIndex(long safelyDeletedSearchIndex) {
this.safelyDeletedSearchIndex = safelyDeletedSearchIndex;
}

/**
* Deserialize via {@link
* org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType#deserialize(ByteBuffer)}
@@ -153,6 +181,7 @@ protected void serializeAttributes(ByteBuffer byteBuffer) {
throw new NotImplementedException("serializeAttributes of InsertNode is not implemented");
}

// region Serialization methods for WAL
/** Serialized size of measurement schemas, ignoring failed time series */
protected int serializeMeasurementSchemasSize() {
int byteLen = 0;
@@ -187,6 +216,7 @@ protected void deserializeMeasurementSchemas(DataInputStream stream) throws IOEx
measurements[i] = measurementSchemas[i].getMeasurementId();
}
}
// endregion

public TRegionReplicaSet getRegionReplicaSet() {
return dataRegionReplicaSet;
@@ -422,7 +422,7 @@ public int serializedSize() {

private int subSerializeSize() {
int size = 0;
size += Long.BYTES;
size += Long.BYTES * 2;
size += ReadWriteIOUtils.sizeToWrite(devicePath.getFullPath());
return size + serializeMeasurementsAndValuesSize();
}
@@ -478,6 +478,7 @@ public void serializeToWAL(IWALByteBufferView buffer) {
}

private void subSerialize(IWALByteBufferView buffer) {
buffer.putLong(searchIndex);
buffer.putLong(time);
WALWriteUtils.write(devicePath.getFullPath(), buffer);
serializeMeasurementsAndValues(buffer);
@@ -534,6 +535,7 @@ public static InsertRowNode deserialize(DataInputStream stream)
throws IOException, IllegalPathException {
// we do not store plan node id in wal entry
InsertRowNode insertNode = new InsertRowNode(new PlanNodeId(""));
insertNode.setSearchIndex(stream.readLong());
insertNode.setTime(stream.readLong());
insertNode.setDevicePath(new PartialPath(ReadWriteIOUtils.readString(stream)));
insertNode.deserializeMeasurementsAndValues(stream);
@@ -86,6 +86,16 @@ public void addOneInsertRowNode(InsertRowNode node, int index) {
insertRowNodeIndexList.add(index);
}

@Override
public void setSearchIndex(long index) {
insertRowNodeList.forEach(plan -> plan.setSearchIndex(index));
}

@Override
public void setSafelyDeletedSearchIndex(long index) {
insertRowNodeList.forEach(plan -> plan.setSafelyDeletedSearchIndex(index));
}

public Map<Integer, TSStatus> getResults() {
return results;
}
@@ -75,6 +75,16 @@ public Map<Integer, TSStatus> getResults() {
return results;
}

@Override
public void setSearchIndex(long index) {
insertRowNodeList.forEach(plan -> plan.setSearchIndex(index));
}

@Override
public void setSafelyDeletedSearchIndex(long index) {
insertRowNodeList.forEach(plan -> plan.setSafelyDeletedSearchIndex(index));
}

public TSStatus[] getFailingStatus() {
return StatusUtils.getFailingStatus(results, insertRowNodeList.size());
}
@@ -537,6 +537,7 @@ public int serializedSize(int start, int end) {

int subSerializeSize(int start, int end) {
int size = 0;
size += Long.BYTES;
size += ReadWriteIOUtils.sizeToWrite(devicePath.getFullPath());
// measurements size
size += Integer.BYTES;
@@ -612,6 +613,7 @@ public void serializeToWAL(IWALByteBufferView buffer, int start, int end) {
}

void subSerialize(IWALByteBufferView buffer, int start, int end) {
buffer.putLong(searchIndex);
WALWriteUtils.write(devicePath.getFullPath(), buffer);
// data types are serialized in measurement schemas
writeMeasurementSchemas(buffer);
@@ -723,6 +725,7 @@ public static InsertTabletNode deserialize(DataInputStream stream)
}

private void subDeserialize(DataInputStream stream) throws IllegalPathException, IOException {
searchIndex = stream.readLong();
devicePath = new PartialPath(ReadWriteIOUtils.readString(stream));

int measurementSize = stream.readInt();
@@ -61,7 +61,7 @@ public void testSerializeAndDeserialize() throws IllegalPathException {
}

@Test
public void TestSerializeAndDeserializeForWAL() throws IllegalPathException, IOException {
public void testSerializeAndDeserializeForWAL() throws IllegalPathException, IOException {
InsertTabletNode insertTabletNode = getInsertTabletNodeWithSchema();

int serializedSize = insertTabletNode.serializedSize();

0 comments on commit 3e63945

Please sign in to comment.