Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions hudi-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,12 @@
<artifactId>orc-core</artifactId>
</dependency>

<!-- RoaringBitmap -->
<dependency>
<groupId>org.roaringbitmap</groupId>
<artifactId>RoaringBitmap</artifactId>
</dependency>

<!-- Httpcomponents -->
<dependency>
<groupId>org.apache.httpcomponents</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,18 @@
import org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Base64CodecUtil;
import org.apache.hudi.common.util.collection.Pair;

import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.roaringbitmap.longlong.Roaring64NavigableMap;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -82,4 +88,60 @@ public static Schema readLatestSchemaFromLogFiles(String basePath, List<HoodieLo
}
return null;
}

/**
* Encodes a list of record positions in long type.
* <p>
* The encoding applies the Base64 codec ({@link java.util.Base64} in Java implementation) on
* the bytes generated from serializing {@link Roaring64NavigableMap} bitmap, which contains
* the list of record positions in long type, using the portable
* format.
*
* @param positions A list of long-typed positions.
* @return A string of Base64-encoded bytes ({@link java.util.Base64} in Java implementation)
* generated from serializing {@link Roaring64NavigableMap} bitmap using the portable format.
* @throws IOException upon I/O error.
*/
public static String encodePositions(List<Long> positions) throws IOException {
Roaring64NavigableMap positionBitmap = new Roaring64NavigableMap();
positions.forEach(positionBitmap::add);
return encodePositions(positionBitmap);
}

/**
* Encodes the {@link Roaring64NavigableMap} bitmap containing the record positions.
* <p>
* The encoding applies the Base64 codec ({@link java.util.Base64} in Java implementation) on
* the bytes generated from serializing {@link Roaring64NavigableMap} bitmap using the portable
* format.
*
* @param positionBitmap {@link Roaring64NavigableMap} bitmap containing the record positions.
* @return A string of Base64-encoded bytes ({@link java.util.Base64} in Java implementation)
* generated from serializing {@link Roaring64NavigableMap} bitmap using the portable format.
* @throws IOException upon I/O error.
*/
public static String encodePositions(Roaring64NavigableMap positionBitmap) throws IOException {
positionBitmap.runOptimize();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(baos);
positionBitmap.serializePortable(dos);
return Base64CodecUtil.encode(baos.toByteArray());
}

/**
* Decodes the {@link HeaderMetadataType#RECORD_POSITIONS} block header into record positions.
*
* @param content A string of Base64-encoded bytes ({@link java.util.Base64} in Java
* implementation) generated from serializing {@link Roaring64NavigableMap}
* bitmap using the portable format.
* @return A {@link Roaring64NavigableMap} bitmap containing the record positions in long type.
* @throws IOException upon I/O error.
*/
public static Roaring64NavigableMap decodeRecordPositionsHeader(String content) throws IOException {
Roaring64NavigableMap positionBitmap = new Roaring64NavigableMap();
ByteArrayInputStream bais = new ByteArrayInputStream(Base64CodecUtil.decode(content));
DataInputStream dis = new DataInputStream(bais);
positionBitmap.deserializePortable(dis);
return positionBitmap;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@

import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.table.log.LogReaderUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.TypeUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.roaringbitmap.longlong.Roaring64NavigableMap;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -122,6 +124,19 @@ public boolean isCompactedLogBlock() {
return logBlockHeader.containsKey(HeaderMetadataType.COMPACTED_BLOCK_TIMES);
}

/**
* @return A {@link Roaring64NavigableMap} bitmap containing the record positions in long type
* if the {@link HeaderMetadataType#RECORD_POSITIONS} block header exists; otherwise, an empty
* {@link Roaring64NavigableMap} bitmap.
* @throws IOException upon I/O error.
*/
public Roaring64NavigableMap getRecordPositions() throws IOException {
if (!logBlockHeader.containsKey(HeaderMetadataType.RECORD_POSITIONS)) {
return new Roaring64NavigableMap();
}
return LogReaderUtils.decodeRecordPositionsHeader(logBlockHeader.get(HeaderMetadataType.RECORD_POSITIONS));
}

/**
* Type of the log block WARNING: This enum is serialized as the ordinal. Only add new enums at the end.
*/
Expand Down Expand Up @@ -153,7 +168,7 @@ public static HoodieLogBlockType fromId(String id) {
* new enums at the end.
*/
public enum HeaderMetadataType {
INSTANT_TIME, TARGET_INSTANT_TIME, SCHEMA, COMMAND_BLOCK_TYPE, COMPACTED_BLOCK_TIMES
INSTANT_TIME, TARGET_INSTANT_TIME, SCHEMA, COMMAND_BLOCK_TYPE, COMPACTED_BLOCK_TIMES, RECORD_POSITIONS
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import org.apache.hudi.common.table.log.HoodieLogFormat.Reader;
import org.apache.hudi.common.table.log.HoodieLogFormat.Writer;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.table.log.LogReaderUtils;
import org.apache.hudi.common.table.log.TestLogReaderUtils;
import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
import org.apache.hudi.common.table.log.block.HoodieCDCDataBlock;
import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
Expand Down Expand Up @@ -2693,13 +2695,29 @@ public void testDataBlockFormatAppendAndReadWithProjectedSchema(
}
}

@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testGetRecordPositions(boolean addRecordPositionsHeader) throws IOException {
Map<HeaderMetadataType, String> header = new HashMap<>();
List<Long> positions = new ArrayList<>();
if (addRecordPositionsHeader) {
positions = TestLogReaderUtils.generatePositions();
String content = LogReaderUtils.encodePositions(positions);
header.put(HeaderMetadataType.RECORD_POSITIONS, content);
}
HoodieLogBlock logBlock = new HoodieDeleteBlock(new DeleteRecord[0], header);
if (addRecordPositionsHeader) {
TestLogReaderUtils.assertPositionEquals(positions, logBlock.getRecordPositions());
}
}

private static HoodieDataBlock getDataBlock(HoodieLogBlockType dataBlockType, List<IndexedRecord> records,
Map<HeaderMetadataType, String> header) {
Map<HeaderMetadataType, String> header) {
return getDataBlock(dataBlockType, records.stream().map(HoodieAvroIndexedRecord::new).collect(Collectors.toList()), header, new Path("dummy_path"));
}

private static HoodieDataBlock getDataBlock(HoodieLogBlockType dataBlockType, List<HoodieRecord> records,
Map<HeaderMetadataType, String> header, Path pathForReader) {
Map<HeaderMetadataType, String> header, Path pathForReader) {
switch (dataBlockType) {
case CDC_DATA_BLOCK:
return new HoodieCDCDataBlock(records, header, HoodieRecord.RECORD_KEY_METADATA_FIELD);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.hudi.common.table.log;

import org.apache.hudi.common.util.FileIOUtils;

import org.junit.jupiter.api.Test;
import org.roaringbitmap.longlong.Roaring64NavigableMap;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.stream.Collectors;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;

/**
* Tests for {@link LogReaderUtils}
*/
public class TestLogReaderUtils {
@Test
public void testEncodeAndDecodePositions() throws IOException {
List<Long> positions = generatePositions();
String content = LogReaderUtils.encodePositions(positions);
Roaring64NavigableMap roaring64NavigableMap = LogReaderUtils.decodeRecordPositionsHeader(content);
assertPositionEquals(positions, roaring64NavigableMap);
}

@Test
public void testEncodeBitmapAndDecodePositions() throws IOException {
Roaring64NavigableMap positionBitmap = new Roaring64NavigableMap();
List<Long> positions = generatePositions();
positions.forEach(positionBitmap::add);
String content = LogReaderUtils.encodePositions(positionBitmap);
Roaring64NavigableMap roaring64NavigableMap = LogReaderUtils.decodeRecordPositionsHeader(content);
assertPositionEquals(positions, roaring64NavigableMap);
}

@Test
public void testCompatibilityOfDecodingPositions() throws IOException {
List<Long> expectedPositions = Arrays.stream(
readLastLineFromResourceFile("/format/expected_record_positions.data").split(","))
.map(Long::parseLong).collect(Collectors.toList());
String content = readLastLineFromResourceFile("/format/record_positions_header_v3.data");
Roaring64NavigableMap roaring64NavigableMap = LogReaderUtils.decodeRecordPositionsHeader(content);
assertPositionEquals(expectedPositions, roaring64NavigableMap);
}

public static List<Long> generatePositions() {
Random random = new Random(0x2023);
Set<Long> positions = new HashSet<>();
while (positions.size() < 1000) {
long pos = Math.abs(random.nextLong() % 1_000_000_000_000L);
positions.add(pos);
}
return new ArrayList<>(positions);
}

public static void assertPositionEquals(List<Long> expectedPositions,
Roaring64NavigableMap roaring64NavigableMap) {
List<Long> sortedExpectedPositions =
expectedPositions.stream().sorted().collect(Collectors.toList());
Iterator<Long> expectedIterator = sortedExpectedPositions.iterator();
Iterator<Long> iterator = roaring64NavigableMap.iterator();
while (expectedIterator.hasNext() && iterator.hasNext()) {
assertEquals(expectedIterator.next(), iterator.next());
}
assertFalse(expectedIterator.hasNext());
assertFalse(iterator.hasNext());
}

private String readLastLineFromResourceFile(String resourceName) throws IOException {
try (InputStream inputStream = TestLogReaderUtils.class.getResourceAsStream(resourceName)) {
List<String> lines = FileIOUtils.readAsUTFStringLines(inputStream);
return lines.get(lines.size() - 1);
}
}
}
Loading