Skip to content

Commit

Permalink
[HUDI-3692] MetadataFileSystemView includes compaction in timeline (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
YuweiXiao committed Mar 31, 2022
1 parent 4569734 commit ce45f7f
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@

import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
Expand All @@ -43,19 +46,34 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import java.io.IOException;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
import static org.apache.hudi.config.HoodieWriteConfig.AUTO_COMMIT_ENABLE;

@Tag("functional")
public class TestHoodieSparkMergeOnReadTableCompaction extends SparkClientFunctionalTestHarness {

private static Stream<Arguments> writeLogTest() {
// enable metadata table, enable embedded time line server
Object[][] data = new Object[][] {
{true, true},
{true, false},
{false, true},
{false, false}
};
return Stream.of(data).map(Arguments::of);
}

private HoodieTestDataGenerator dataGen;
private SparkRDDWriteClient client;
private HoodieTableMetaClient metaClient;
Expand Down Expand Up @@ -104,6 +122,44 @@ public void testWriteDuringCompaction() throws IOException {
Assertions.assertEquals(300, readTableTotalRecordsNum());
}

@ParameterizedTest
@MethodSource("writeLogTest")
public void testWriteLogDuringCompaction(boolean enableMetadataTable, boolean enableTimelineServer) throws IOException {
HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
.forTable("test-trip-table")
.withPath(basePath())
.withSchema(TRIP_EXAMPLE_SCHEMA)
.withParallelism(2, 2)
.withAutoCommit(true)
.withEmbeddedTimelineServerEnabled(enableTimelineServer)
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadataTable).build())
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withMaxNumDeltaCommitsBeforeCompaction(1).build())
.withLayoutConfig(HoodieLayoutConfig.newBuilder()
.withLayoutType(HoodieStorageLayout.LayoutType.BUCKET.name())
.withLayoutPartitioner(SparkBucketIndexPartitioner.class.getName()).build())
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BUCKET).withBucketNum("1").build()).build();
metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, config.getProps());
client = getHoodieWriteClient(config);

final List<HoodieRecord> records = dataGen.generateInserts("001", 100);
JavaRDD<HoodieRecord> writeRecords = jsc().parallelize(records, 2);

// initialize 100 records
client.upsert(writeRecords, client.startCommit());
// update 100 records
client.upsert(writeRecords, client.startCommit());
// schedule compaction
client.scheduleCompaction(Option.empty());
// delete 50 records
List<HoodieKey> toBeDeleted = records.stream().map(HoodieRecord::getKey).limit(50).collect(Collectors.toList());
JavaRDD<HoodieKey> deleteRecords = jsc().parallelize(toBeDeleted, 2);
client.delete(deleteRecords, client.startCommit());
// insert the same 100 records again
client.upsert(writeRecords, client.startCommit());
Assertions.assertEquals(100, readTableTotalRecordsNum());
}

private long readTableTotalRecordsNum() {
return HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(),
Arrays.stream(dataGen.getPartitionPaths()).map(p -> Paths.get(basePath(), p).toString()).collect(Collectors.toList()), basePath()).size();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ private static HoodieTableFileSystemView createInMemoryFileSystemView(HoodieMeta
HoodieTimeline timeline = metaClient.getActiveTimeline().filterCompletedAndCompactionInstants();
if (metadataConfig.enabled()) {
ValidationUtils.checkArgument(metadataSupplier != null, "Metadata supplier is null. Cannot instantiate metadata file system view");
return new HoodieMetadataFileSystemView(metaClient, metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(),
return new HoodieMetadataFileSystemView(metaClient, metaClient.getActiveTimeline().filterCompletedAndCompactionInstants(),
metadataSupplier.get());
}
return new HoodieTableFileSystemView(metaClient, timeline, viewConf.isIncrementalTimelineSyncEnabled());
Expand Down

0 comments on commit ce45f7f

Please sign in to comment.