diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java index b3e1d212041a5..092f3601c57ec 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java @@ -105,6 +105,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.ShowQueriesNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.TimeseriesRegionScanNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.ContinuousSameSearchIndexSeparatorNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertMultiTabletsNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode; @@ -256,6 +257,8 @@ public static PlanNode deserializeFromWAL(DataInputStream stream) throws IOExcep return InsertRowsNode.deserializeFromWAL(stream); case 44: return DeleteDataNode.deserializeFromWAL(stream); + case 97: + return ContinuousSameSearchIndexSeparatorNode.deserializeFromWAL(stream); default: throw new IllegalArgumentException("Invalid node type: " + nodeType); } @@ -272,6 +275,8 @@ public static PlanNode deserializeFromWAL(ByteBuffer buffer) { return InsertRowsNode.deserializeFromWAL(buffer); case 44: return DeleteDataNode.deserializeFromWAL(buffer); + case 97: + return ContinuousSameSearchIndexSeparatorNode.deserializeFromWAL(buffer); default: throw new IllegalArgumentException("Invalid node type: " + nodeType); } @@ -470,6 +475,9 @@ public static PlanNode deserialize(ByteBuffer buffer, short nodeType) { return ActiveRegionScanMergeNode.deserialize(buffer); case 96: return DeviceSchemaFetchScanNode.deserialize(buffer); + case 97: + throw new UnsupportedOperationException( + "You should never see ContinuousSameSearchIndexSeparatorNode in this function, because ContinuousSameSearchIndexSeparatorNode should never be used in network transmission."); default: throw new IllegalArgumentException("Invalid node type: " + nodeType); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/ContinuousSameSearchIndexSeparatorNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/ContinuousSameSearchIndexSeparatorNode.java index 78bf312bf9ebe..521f720be7899 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/ContinuousSameSearchIndexSeparatorNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/ContinuousSameSearchIndexSeparatorNode.java @@ -19,15 +19,35 @@ package org.apache.iotdb.db.queryengine.plan.planner.plan.node.write; +import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; +import org.apache.iotdb.db.queryengine.plan.analyze.IAnalysis; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode; import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView; import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryValue; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; + /** * For IoTConsensus sync. See github pull * request for details. */ -public class ContinuousSameSearchIndexSeparatorNode implements WALEntryValue { +public class ContinuousSameSearchIndexSeparatorNode extends SearchNode implements WALEntryValue { + + public ContinuousSameSearchIndexSeparatorNode() { + super(new PlanNodeId("")); + } + + public ContinuousSameSearchIndexSeparatorNode(PlanNodeId id) { + super(id); + this.searchIndex = -1; + } @Override public void serializeToWAL(IWALByteBufferView buffer) { @@ -40,4 +60,65 @@ public void serializeToWAL(IWALByteBufferView buffer) { public int serializedSize() { return Short.BYTES + Long.BYTES; } + + public static ContinuousSameSearchIndexSeparatorNode deserializeFromWAL(DataInputStream stream) + throws IOException { + long ignored = stream.readLong(); + return new ContinuousSameSearchIndexSeparatorNode(new PlanNodeId("")); + } + + public static ContinuousSameSearchIndexSeparatorNode deserializeFromWAL(ByteBuffer buffer) { + long ignored = buffer.getLong(); + return new ContinuousSameSearchIndexSeparatorNode(new PlanNodeId("")); + } + + // region all operations below are unsupported + + private static final String UNSUPPORTED_MESSAGE = + "ContinuousSameSearchIndexSeparatorNode not support this operation"; + + @Override + public TRegionReplicaSet getRegionReplicaSet() { + throw new UnsupportedOperationException(UNSUPPORTED_MESSAGE); + } + + @Override + public List getChildren() { + return null; + } + + @Override + public void addChild(PlanNode child) { + throw new UnsupportedOperationException(UNSUPPORTED_MESSAGE); + } + + @Override + public PlanNode clone() { + throw new UnsupportedOperationException(UNSUPPORTED_MESSAGE); + } + + @Override + public int allowedChildCount() { + return 0; + } + + @Override + public List getOutputColumnNames() { + throw new UnsupportedOperationException(UNSUPPORTED_MESSAGE); + } + + @Override + protected void serializeAttributes(ByteBuffer byteBuffer) { + throw new UnsupportedOperationException(UNSUPPORTED_MESSAGE); + } + + @Override + protected void serializeAttributes(DataOutputStream stream) throws IOException { + throw new UnsupportedOperationException(UNSUPPORTED_MESSAGE); + } + + @Override + public List splitByPartition(IAnalysis analysis) { + throw new UnsupportedOperationException(UNSUPPORTED_MESSAGE); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALEntry.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALEntry.java index f9ea8fdb33397..86bd63716a45f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALEntry.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALEntry.java @@ -122,6 +122,9 @@ public static WALEntry deserialize(DataInputStream stream) throws IOException { case DELETE_DATA_NODE: value = (DeleteDataNode) PlanNodeType.deserializeFromWAL(stream); break; + case CONTINUOUS_SAME_SEARCH_INDEX_SEPARATOR_NODE: + value = (ContinuousSameSearchIndexSeparatorNode) PlanNodeType.deserializeFromWAL(stream); + break; default: throw new RuntimeException("Unknown WALEntry type " + type); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/UnsealedTsFileRecoverPerformer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/UnsealedTsFileRecoverPerformer.java index c7af6e2e51856..dbcca1a77c1e9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/UnsealedTsFileRecoverPerformer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/UnsealedTsFileRecoverPerformer.java @@ -216,6 +216,9 @@ public void redoLog(WALEntry walEntry) { case DELETE_DATA_NODE: walRedoer.redoDelete((DeleteDataNode) walEntry.getValue()); break; + case CONTINUOUS_SAME_SEARCH_INDEX_SEPARATOR_NODE: + // The CONTINUOUS_SAME_SEARCH_INDEX_SEPARATOR_NODE doesn't need redo + break; default: throw new RuntimeException("Unsupported type " + walEntry.getType()); } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/ContinuousSameSearchIndexSeparatorNodeSerDeTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/ContinuousSameSearchIndexSeparatorNodeSerDeTest.java new file mode 100644 index 0000000000000..ec77a59da2844 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/ContinuousSameSearchIndexSeparatorNodeSerDeTest.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.planner.node.write; + +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.ContinuousSameSearchIndexSeparatorNode; +import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALByteBufferForTest; + +import org.junit.Assert; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.nio.ByteBuffer; + +public class ContinuousSameSearchIndexSeparatorNodeSerDeTest { + @Test + public void testSerializeAndDeserializeForWAL() throws Exception { + ContinuousSameSearchIndexSeparatorNode node = + new ContinuousSameSearchIndexSeparatorNode(new PlanNodeId("???")); + + int serializedSize = node.serializedSize(); + + byte[] bytes = new byte[serializedSize]; + WALByteBufferForTest walBuffer = new WALByteBufferForTest(ByteBuffer.wrap(bytes)); + + node.serializeToWAL(walBuffer); + Assert.assertFalse(walBuffer.getBuffer().hasRemaining()); + + DataInputStream dataInputStream = new DataInputStream(new ByteArrayInputStream(bytes)); + + Assert.assertEquals( + PlanNodeType.CONTINUOUS_SAME_SEARCH_INDEX_SEPARATOR.getNodeType(), + dataInputStream.readShort()); + + ContinuousSameSearchIndexSeparatorNode.deserializeFromWAL(dataInputStream); + } +}