Skip to content

Commit

Permalink
[HUDI-4298] When reading the mor table with QUERY_TYPE_SNAPSHOT,Unabl… (
Browse files Browse the repository at this point in the history
#5937)

* [HUDI-4298] Add test case for reading mor table

Signed-off-by: LinMingQiang <1356469429@qq.com>
  • Loading branch information
LinMingQiang committed Jul 12, 2022
1 parent a270eee commit 994c561
Show file tree
Hide file tree
Showing 7 changed files with 179 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ public static HoodieTableMetaClient init(String basePath, HoodieTableType tableT
return init(getDefaultHadoopConf(), basePath, tableType);
}

public static HoodieTableMetaClient init(String basePath, HoodieTableType tableType, Properties properties) throws IOException {
return init(getDefaultHadoopConf(), basePath, tableType, properties);
}

public static HoodieTableMetaClient init(String basePath, HoodieTableType tableType, String bootstrapBasePath, boolean bootstrapIndexEnable) throws IOException {
Properties props = new Properties();
props.setProperty(HoodieTableConfig.BOOTSTRAP_BASE_PATH.key(), bootstrapBasePath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ private void testWriteToHoodie(
client.getJobExecutionResult().get();
}

TestData.checkWrittenFullData(tempFile, expected);
TestData.checkWrittenDataCOW(tempFile, expected);
}

private void testWriteToHoodieWithCluster(
Expand Down Expand Up @@ -327,7 +327,7 @@ private void testWriteToHoodieWithCluster(
// wait for the streaming job to finish
client.getJobExecutionResult().get();

TestData.checkWrittenFullData(tempFile, expected);
TestData.checkWrittenDataCOW(tempFile, expected);
}

public void execute(StreamExecutionEnvironment execEnv, boolean isMor, String jobName) throws Exception {
Expand Down Expand Up @@ -449,7 +449,7 @@ public void testHoodiePipelineBuilderSink() throws Exception {
builder.sink(dataStream, false);

execute(execEnv, true, "Api_Sink_Test");
TestData.checkWrittenFullData(tempFile, EXPECTED);
TestData.checkWrittenDataCOW(tempFile, EXPECTED);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ public void testInsertAppendMode() throws Exception {
.checkpoint(2)
.assertNextEvent()
.checkpointComplete(2)
.checkWrittenFullData(EXPECTED5)
.checkWrittenDataCOW(EXPECTED5)
.end();
}

Expand Down Expand Up @@ -282,7 +282,7 @@ public void testInsertClustering() throws Exception {
.checkpoint(2)
.handleEvents(2)
.checkpointComplete(2)
.checkWrittenFullData(EXPECTED5)
.checkWrittenDataCOW(EXPECTED5)
.end();
}

Expand All @@ -305,7 +305,7 @@ public void testInsertAsyncClustering() throws Exception {
.checkpoint(2)
.handleEvents(1)
.checkpointComplete(2)
.checkWrittenFullData(EXPECTED5)
.checkWrittenDataCOW(EXPECTED5)
.end();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@

package org.apache.hudi.sink;

import org.apache.hudi.common.model.EventTimeAvroPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.utils.TestData;

import org.apache.flink.configuration.Configuration;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -84,6 +87,60 @@ public void testIndexStateBootstrapWithCompactionScheduled() throws Exception {
validateIndexLoaded();
}

@Test
public void testEventTimeAvroPayloadMergeRead() throws Exception {
conf.set(FlinkOptions.COMPACTION_ASYNC_ENABLED, true);
conf.set(FlinkOptions.PATH, tempFile.getAbsolutePath());
conf.set(FlinkOptions.TABLE_TYPE, HoodieTableType.MERGE_ON_READ.name());
conf.set(FlinkOptions.OPERATION, "upsert");
conf.set(FlinkOptions.CHANGELOG_ENABLED, false);
conf.set(FlinkOptions.COMPACTION_DELTA_COMMITS, 2);
conf.set(FlinkOptions.PRE_COMBINE, true);
conf.set(FlinkOptions.PRECOMBINE_FIELD, "ts");
conf.set(FlinkOptions.PAYLOAD_CLASS_NAME, EventTimeAvroPayload.class.getName());
HashMap<String, String> mergedExpected = new HashMap<>(EXPECTED1);
mergedExpected.put("par1", "[id1,par1,id1,Danny,22,4,par1, id2,par1,id2,Stephen,33,2,par1]");
TestHarness.instance().preparePipeline(tempFile, conf)
.consume(TestData.DATA_SET_INSERT)
.emptyEventBuffer()
.checkpoint(1)
.assertNextEvent()
.checkpointComplete(1)
.checkWrittenData(EXPECTED1, 4)
.consume(TestData.DATA_SET_DISORDER_INSERT)
.emptyEventBuffer()
.checkpoint(2)
.assertNextEvent()
.checkpointComplete(2)
.checkWrittenData(mergedExpected, 4)
.consume(TestData.DATA_SET_SINGLE_INSERT)
.emptyEventBuffer()
.checkpoint(3)
.assertNextEvent()
.checkpointComplete(3)
.checkWrittenData(mergedExpected, 4)
.end();
}

@ParameterizedTest
@ValueSource(ints = {1, 2})
public void testOnlyBaseFileOrOnlyLogFileRead(int compactionDeltaCommits) throws Exception {
conf.set(FlinkOptions.COMPACTION_ASYNC_ENABLED, true);
conf.set(FlinkOptions.PATH, tempFile.getAbsolutePath());
conf.set(FlinkOptions.TABLE_TYPE, HoodieTableType.MERGE_ON_READ.name());
conf.set(FlinkOptions.OPERATION, "upsert");
conf.set(FlinkOptions.CHANGELOG_ENABLED, false);
conf.set(FlinkOptions.COMPACTION_DELTA_COMMITS, compactionDeltaCommits);
TestHarness.instance().preparePipeline(tempFile, conf)
.consume(TestData.DATA_SET_INSERT)
.emptyEventBuffer()
.checkpoint(1)
.assertNextEvent()
.checkpointComplete(1)
.checkWrittenData(EXPECTED1, 4)
.end();
}

@Override
public void testInsertClustering() {
// insert clustering is only valid for cow table.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ public void testHoodieFlinkCompactor(boolean enableChangelog) throws Exception {

env.execute("flink_hudi_compaction");
writeClient.close();
TestData.checkWrittenFullData(tempFile, EXPECTED1);
TestData.checkWrittenDataCOW(tempFile, EXPECTED1);
}

@ParameterizedTest
Expand Down Expand Up @@ -202,7 +202,7 @@ public void testHoodieFlinkCompactorService(boolean enableChangelog) throws Exce

asyncCompactionService.shutDown();

TestData.checkWrittenFullData(tempFile, EXPECTED2);
TestData.checkWrittenDataCOW(tempFile, EXPECTED2);
}

@ParameterizedTest
Expand Down Expand Up @@ -281,7 +281,7 @@ public void testHoodieFlinkCompactorWithPlanSelectStrategy(boolean enableChangel

env.execute("flink_hudi_compaction");
writeClient.close();
TestData.checkWrittenFullData(tempFile, EXPECTED3);
TestData.checkWrittenDataCOW(tempFile, EXPECTED3);
}

private String scheduleCompactionPlan(HoodieTableMetaClient metaClient, HoodieFlinkWriteClient<?> writeClient) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,14 @@
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.TestData;
import org.apache.hudi.utils.TestUtils;

import org.apache.avro.Schema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.table.data.RowData;
Expand Down Expand Up @@ -338,9 +332,7 @@ public TestHarness checkWrittenData(Map<String, String> expected) throws Excepti
public TestHarness checkWrittenData(
Map<String, String> expected,
int partitions) throws Exception {
if (OptionsResolver.isCowTable(conf)
|| conf.getBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED)
|| OptionsResolver.isAppendMode(conf)) {
if (OptionsResolver.isCowTable(conf)) {
TestData.checkWrittenData(this.baseFile, expected, partitions);
} else {
checkWrittenDataMor(baseFile, expected, partitions);
Expand All @@ -349,15 +341,12 @@ public TestHarness checkWrittenData(
}

private void checkWrittenDataMor(File baseFile, Map<String, String> expected, int partitions) throws Exception {
HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(basePath, HadoopConfigurations.getHadoopConf(conf));
Schema schema = new TableSchemaResolver(metaClient).getTableAvroSchema();
String latestInstant = lastCompleteInstant();
FileSystem fs = FSUtils.getFs(basePath, new org.apache.hadoop.conf.Configuration());
TestData.checkWrittenDataMOR(fs, latestInstant, baseFile, expected, partitions, schema);
TestData.checkWrittenDataMOR(fs, baseFile, expected, partitions);
}

public TestHarness checkWrittenFullData(Map<String, List<String>> expected) throws IOException {
TestData.checkWrittenFullData(this.baseFile, expected);
public TestHarness checkWrittenDataCOW(Map<String, List<String>> expected) throws IOException {
TestData.checkWrittenDataCOW(this.baseFile, expected);
return this;
}

Expand Down
Loading

0 comments on commit 994c561

Please sign in to comment.