diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventParser.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventParser.java index 29364b7bc313e..f1c89e1305c84 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventParser.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventParser.java @@ -57,6 +57,8 @@ public abstract class TabletInsertionEventParser { private static final Logger LOGGER = LoggerFactory.getLogger(TabletInsertionEventParser.class); + private static final LocalDate EMPTY_LOCALDATE = LocalDate.of(1000, 1, 1); + protected final PipeTaskMeta pipeTaskMeta; // used to report progress protected final EnrichedEvent sourceEvent; // used to report progress and filter value columns by time range @@ -246,8 +248,12 @@ protected void parse(final InsertTabletNode insertTabletNode) { final BitMap bitMap = new BitMap(this.timestampColumn.length); if (Objects.isNull(originValueColumns[i]) || Objects.isNull(originValueColumnDataTypes[i])) { - this.valueColumns[filteredColumnIndex] = null; - bitMap.markAll(); + fillNullValue( + originValueColumnDataTypes[i], + this.valueColumns, + bitMap, + filteredColumnIndex, + rowIndexList.size()); } else { this.valueColumns[filteredColumnIndex] = filterValueColumnsByRowIndexList( @@ -348,8 +354,12 @@ protected void parse(final Tablet tablet, final boolean isAligned) { final BitMap bitMap = new BitMap(this.timestampColumn.length); if (Objects.isNull(originValueColumns[i]) || Objects.isNull(originValueColumnDataTypes[i])) { - this.valueColumns[filteredColumnIndex] = null; - bitMap.markAll(); + fillNullValue( + originValueColumnDataTypes[i], + this.valueColumns, + bitMap, + filteredColumnIndex, + rowIndexList.size()); } else { this.valueColumns[filteredColumnIndex] = filterValueColumnsByRowIndexList( @@ -449,7 +459,7 @@ private static Object filterValueColumnsByRowIndexList( for (int i = 0; i < rowIndexList.size(); ++i) { if (originNullValueColumnBitmap.isMarked(rowIndexList.get(i))) { - valueColumns[i] = LocalDate.MIN; + valueColumns[i] = EMPTY_LOCALDATE; nullValueColumnBitmap.mark(i); } else { valueColumns[i] = dateValueColumns[rowIndexList.get(i)]; @@ -463,7 +473,7 @@ private static Object filterValueColumnsByRowIndexList( : (int[]) originValueColumn; for (int i = 0; i < rowIndexList.size(); ++i) { if (originNullValueColumnBitmap.isMarked(rowIndexList.get(i))) { - valueColumns[i] = LocalDate.MIN; + valueColumns[i] = EMPTY_LOCALDATE; nullValueColumnBitmap.mark(i); } else { valueColumns[i] = @@ -569,6 +579,47 @@ private static Object filterValueColumnsByRowIndexList( } } + private void fillNullValue( + final TSDataType type, + final Object[] valueColumns, + final BitMap nullValueColumnBitmap, + final int columnIndex, + final int rowSize) { + nullValueColumnBitmap.markAll(); + if (Objects.isNull(type)) { + return; + } + switch (type) { + case TIMESTAMP: + case INT64: + valueColumns[columnIndex] = new long[rowSize]; + break; + case INT32: + valueColumns[columnIndex] = new int[rowSize]; + break; + case DOUBLE: + valueColumns[columnIndex] = new double[rowSize]; + break; + case FLOAT: + valueColumns[columnIndex] = new float[rowSize]; + break; + case BOOLEAN: + valueColumns[columnIndex] = new boolean[rowSize]; + break; + case DATE: + valueColumns[columnIndex] = new LocalDate[rowSize]; + break; + case TEXT: + case BLOB: + case STRING: + valueColumns[columnIndex] = new Binary[rowSize]; + break; + default: + throw new UnSupportedDataTypeException( + String.format("Data type %s is not supported.", type)); + } + } + //////////////////////////// process //////////////////////////// public abstract List processRowByRow( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/batch/utils/BatchCompactionPlan.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/batch/utils/BatchCompactionPlan.java index 1ba5ff680ec0b..7b1326f437f73 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/batch/utils/BatchCompactionPlan.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/batch/utils/BatchCompactionPlan.java @@ -67,7 +67,7 @@ public void addTimeChunkToCache(String file, long offset, Chunk chunk) { chunk.getData(), chunk.getDeleteIntervalList(), chunk.getChunkStatistic(), - chunk.getDecryptor())); + chunk.getEncryptParam())); cachedTimeChunkSize += chunk.getHeader().getDataSize(); } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionValidationTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionValidationTest.java index c066aef0fa680..172cd1d80d5c1 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionValidationTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionValidationTest.java @@ -203,7 +203,7 @@ public void testOneUncompletedInMultiCompletedFiles2() throws IOException { writeOneFile(path); if (i == 5) { RandomAccessFile randomAccessFile = new RandomAccessFile(path, "rw"); - randomAccessFile.seek(randomAccessFile.length() - 100); + randomAccessFile.seek(randomAccessFile.length() - 130); randomAccessFile.write(new byte[] {1, 2, 3, 4, 5, 6, 7, 8}); randomAccessFile.close(); } diff --git a/pom.xml b/pom.xml index dfd1c7171ae7e..42b1dd3554504 100644 --- a/pom.xml +++ b/pom.xml @@ -166,7 +166,7 @@ 0.14.1 1.9 1.5.6-3 - 1.2.0-1c9924b4-SNAPSHOT + 1.2.0-241106-SNAPSHOT