From 4099eaa7be03a26bd50122fe5a004feb0a0c066c Mon Sep 17 00:00:00 2001 From: Cpaulyz Date: Tue, 15 Nov 2022 16:14:06 +0800 Subject: [PATCH] done --- .../integration/sync/IoTDBSyncSenderIT.java | 2 +- .../db/sync/pipedata/DeletionPipeData.java | 2 +- .../iotdb/db/sync/pipedata/PipeData.java | 31 ++++++++++++++++--- .../db/sync/pipedata/TsFilePipeData.java | 2 +- .../pipedata/queue/BufferedPipeDataQueue.java | 2 +- .../transport/server/ReceiverManager.java | 2 +- 6 files changed, 31 insertions(+), 10 deletions(-) diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncSenderIT.java b/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncSenderIT.java index 159f42a4aa479..5baf7413fa4df 100644 --- a/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncSenderIT.java +++ b/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncSenderIT.java @@ -268,7 +268,7 @@ private void checkResult(List resultString, List list) { int cnt = 0; for (String string : resultString) { for (PipeData pipeData : resultMap.get(string)) { - Assert.assertEquals(pipeData.getType(), list.get(cnt++).getType()); + Assert.assertEquals(pipeData.getPipeDataType(), list.get(cnt++).getPipeDataType()); } } } diff --git a/server/src/main/java/org/apache/iotdb/db/sync/pipedata/DeletionPipeData.java b/server/src/main/java/org/apache/iotdb/db/sync/pipedata/DeletionPipeData.java index 0b03fcf43100f..4ebe06b34b450 100644 --- a/server/src/main/java/org/apache/iotdb/db/sync/pipedata/DeletionPipeData.java +++ b/server/src/main/java/org/apache/iotdb/db/sync/pipedata/DeletionPipeData.java @@ -54,7 +54,7 @@ public DeletionPipeData(String sgName, Deletion deletion, long serialNumber) { } @Override - public PipeDataType getType() { + public PipeDataType getPipeDataType() { return PipeDataType.DELETION; } diff --git a/server/src/main/java/org/apache/iotdb/db/sync/pipedata/PipeData.java b/server/src/main/java/org/apache/iotdb/db/sync/pipedata/PipeData.java index 18d41d2f354e9..fa9fcbbe3f84d 100644 --- a/server/src/main/java/org/apache/iotdb/db/sync/pipedata/PipeData.java +++ b/server/src/main/java/org/apache/iotdb/db/sync/pipedata/PipeData.java @@ -50,11 +50,11 @@ public void setSerialNumber(long serialNumber) { this.serialNumber = serialNumber; } - public abstract PipeDataType getType(); + public abstract PipeDataType getPipeDataType(); public long serialize(DataOutputStream stream) throws IOException { long serializeSize = 0; - stream.writeByte((byte) getType().ordinal()); + stream.writeByte(getPipeDataType().getType()); serializeSize += Byte.BYTES; stream.writeLong(serialNumber); serializeSize += Long.BYTES; @@ -74,7 +74,7 @@ public void deserialize(DataInputStream stream) throws IOException, IllegalPathE public static PipeData createPipeData(DataInputStream stream) throws IOException, IllegalPathException { PipeData pipeData; - PipeDataType type = PipeDataType.values()[stream.readByte()]; + PipeDataType type = PipeDataType.getPipeDataType(stream.readByte()); switch (type) { case TSFILE: pipeData = new TsFilePipeData(); @@ -98,7 +98,28 @@ public static PipeData createPipeData(byte[] bytes) throws IllegalPathException, public abstract ILoader createLoader(); public enum PipeDataType { - TSFILE, - DELETION, + TSFILE((byte) 0), + DELETION((byte) 1); + + private final byte type; + + PipeDataType(byte type) { + this.type = type; + } + + public byte getType() { + return type; + } + + public static PipeDataType getPipeDataType(byte type) { + switch (type) { + case 0: + return PipeDataType.TSFILE; + case 1: + return PipeDataType.DELETION; + default: + throw new IllegalArgumentException("Invalid input: " + type); + } + } } } diff --git a/server/src/main/java/org/apache/iotdb/db/sync/pipedata/TsFilePipeData.java b/server/src/main/java/org/apache/iotdb/db/sync/pipedata/TsFilePipeData.java index 82de9efc5414f..9eaf7128a4506 100644 --- a/server/src/main/java/org/apache/iotdb/db/sync/pipedata/TsFilePipeData.java +++ b/server/src/main/java/org/apache/iotdb/db/sync/pipedata/TsFilePipeData.java @@ -121,7 +121,7 @@ public String getStorageGroupName() { } @Override - public PipeDataType getType() { + public PipeDataType getPipeDataType() { return PipeDataType.TSFILE; } diff --git a/server/src/main/java/org/apache/iotdb/db/sync/pipedata/queue/BufferedPipeDataQueue.java b/server/src/main/java/org/apache/iotdb/db/sync/pipedata/queue/BufferedPipeDataQueue.java index 4faaacd2322bf..dc3f67fe62d32 100644 --- a/server/src/main/java/org/apache/iotdb/db/sync/pipedata/queue/BufferedPipeDataQueue.java +++ b/server/src/main/java/org/apache/iotdb/db/sync/pipedata/queue/BufferedPipeDataQueue.java @@ -311,7 +311,7 @@ private void deletePipeData(long serialNumber) { if (commitData == null) { return; } - if (PipeData.PipeDataType.TSFILE.equals(commitData.getType())) { + if (PipeData.PipeDataType.TSFILE.equals(commitData.getPipeDataType())) { List tsFiles = ((TsFilePipeData) commitData).getTsFiles(false); for (File file : tsFiles) { Files.deleteIfExists(file.toPath()); diff --git a/server/src/main/java/org/apache/iotdb/db/sync/transport/server/ReceiverManager.java b/server/src/main/java/org/apache/iotdb/db/sync/transport/server/ReceiverManager.java index 78bbd3194c151..3b174552ebc2e 100644 --- a/server/src/main/java/org/apache/iotdb/db/sync/transport/server/ReceiverManager.java +++ b/server/src/main/java/org/apache/iotdb/db/sync/transport/server/ReceiverManager.java @@ -242,7 +242,7 @@ public TSStatus transportPipeData(ByteBuffer buff) throws TException { logger.info( "Start load pipeData with serialize number {} and type {},value={}", pipeData.getSerialNumber(), - pipeData.getType(), + pipeData.getPipeDataType(), pipeData); try { pipeData.createLoader().load();