Skip to content

[HUDI-8003] Add OverwriteWithLatestHiveRecordMerger and restructure existing merger classes#11649

Merged
yihua merged 13 commits intoapache:masterfrom
jonvex:add_overwrite_hive_payload
Sep 11, 2024
Merged

[HUDI-8003] Add OverwriteWithLatestHiveRecordMerger and restructure existing merger classes#11649
yihua merged 13 commits intoapache:masterfrom
jonvex:add_overwrite_hive_payload

Conversation

@jonvex
Copy link
Contributor

@jonvex jonvex commented Jul 18, 2024

Change Logs

overwrite payload for hive is not implemented, so an exception will be thrown if that is the chosen record merger strategy

This pr ended up expanding scope a lot. Here are the other changes:

  • Implement TestHoodieFileGroupReaderOnHive, the hive implementation of TestHoodieFileGroupReaderBase
  • Rename record mergers for spark to have naming that makes more sense and be consistent with hive
    • OverwriteWithLatestSparkMerger -> OverwriteWithLatestSparkRecordMerger
    • HoodieSparkRecordMerger is now an abstract class that all spark mergers extend from, and all the implementation was moved to DefaultSparkRecordMerger
    • Same thing with hive. Now HoodieHiveRecordMerger is an abstract base class and DefaultHiveRecordMerger has the implementation
  • Refactor of HiveHoodieReaderContext to not take in FileSlice specific info
    • reader context can have some state, but when it is initialized, it shouldn't be specific to any filegroup
  • Get rid of cache for table -> objectinspector. Maybe this hurts performance or maybe negligible, but is better for ensuring correctness
  • in HoodieRealtimeRecordReaderUtils, enum now converts to a BytesWritable (hadoop class for byte array) instead of Text (hadoop class for string).
    • Parquet stores enums as byte arrays. During testing I found that for the non-FGReader implementation: For a fg that only had a base file and no log files, the enum would be read as BytesWritable, but for a read of a fg with base and log files it would be read as Text. This is because for merging we do the merging in avro and then convert to array writable (in the fg reader we merge using the engine native representation). It seems like hadoop/hive considers enums to be strings but since we haven't had any issues with byte array, I think it is better to unify everything as byte array.

Impact

  • Implement additional record merger for hive
  • Better code quality for fg reader on hive
  • less risk of correctness issues for hive fg reader
  • Better and consistent naming for record mergers
  • More testing for fg reader on hive
  • Consistent type for enums

Risk level (write none, low medium or high below)

medium
added extra testing for hive record reader

Documentation Update

N/A

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

@github-actions github-actions bot added the size:S PR with lines of changes in (10, 100] label Jul 18, 2024
@github-actions github-actions bot added size:L PR with lines of changes in (300, 1000] and removed size:S PR with lines of changes in (10, 100] labels Jul 23, 2024
@jonvex jonvex requested a review from yihua July 23, 2024 19:58
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needed for NetworkTestUtils


@Override
@Disabled
public void testReadLogFilesOnlyInMergeOnReadTable(RecordMergeMode recordMergeMode, String logDataBlockFormat) throws Exception {}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this test was not working because base files were being written. maybe we need to do a followup ticket for this so we can land what we have

@@ -89,41 +84,17 @@ public class HiveHoodieReaderContext extends HoodieReaderContext<ArrayWritable>
private final String recordKeyField;

protected HiveHoodieReaderContext(HoodieFileGroupReaderBasedRecordReader.HiveReaderCreator readerCreator,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reader context shouldn't be initialized with things specific to a particular fg. Got rid of some unused inputs, and moved the calculations for some things like tablename, partition cols, and recordkey field into the reader so that this class doesn't have big dependencies on things like metaclient etc.

* If populate meta fields is false, then getRecordKeyFields()
* should return exactly 1 recordkey field.
*/
private static String getRecordKeyField(HoodieTableMetaClient metaClient) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved to filegroup reader record reader

public ClosableIterator<ArrayWritable> getFileRecordIterator(StoragePath filePath, long start, long length, Schema dataSchema, Schema requiredSchema, HoodieStorage storage) throws IOException {
JobConf jobConfCopy = new JobConf(jobConf);
JobConf jobConfCopy = new JobConf(storage.getConf().unwrapAs(Configuration.class));
if (getNeedsBootstrapMerge()) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

took this from parquet input format

* should return exactly 1 recordkey field.
*/
@VisibleForTesting
static String getRecordKeyField(HoodieTableMetaClient metaClient) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

took these from the reader context

* Convert FileSplit to FileSlice, but save the locations in 'hosts' because that data is otherwise lost.
* Convert FileSplit to FileSlice
*/
private static FileSlice getFileSliceFromSplit(FileSplit split, Map<String, String[]> hosts, FileSystem fs, String tableBasePath) throws IOException {
Copy link
Contributor Author

@jonvex jonvex Jul 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got rid of hosts, will create a followup ticket to allow this info and other engine specific file things to get passed naturally through the filegroup structure

return super.getRecordReader(split, job, reporter);
}
if (supportAvroRead && HoodieColumnProjectionUtils.supportTimestamp(job)) {
return new HoodieFileGroupReaderBasedRecordReader((s, j, r) -> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reporter was just passed a few levels down and then used without any modification. We can just pass it directly here and simplify the code

private static final Cache<String, ObjectInspectorCache>
OBJECT_INSPECTOR_TABLE_CACHE = Caffeine.newBuilder().maximumSize(1000).build();

public static ObjectInspectorCache getCacheForTable(String table, Schema tableSchema, JobConf jobConf) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We weren't actually storing anything in OBJECT_INSPECTOR_TABLE_CACHE. Making this work as intended might have some issues because what if there are two tables with the same name but different locations? Also schema evolution might be an issue. We probably don't want to persist the cache between queries, just in a single query we don't want to recompute each time. Or maybe the overhead of a global cache is more than the computation? How can we find this out?

}

@VisibleForTesting
public static void resetCache() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needed to add this because now that we cache things, there were conflicts between tests

@yihua yihua changed the title [HUDI-8003] add hive overwrite payload [HUDI-8003] Add hive overwrite payload Jul 24, 2024
<artifactId>hudi-io</artifactId>
<version>${project.version}</version>
<classifier>tests</classifier>
<type>test-jar</type>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is <type>test-jar</type> needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will come back to this when all other comments are addressed

Comment on lines +147 to +158
@Override
public String getRecordPayloadForMergeMode(RecordMergeMode mergeMode) {
switch (mergeMode) {
case EVENT_TIME_ORDERING:
return DefaultHoodieRecordPayload.class.getName();
case OVERWRITE_WITH_LATEST:
return OverwriteWithLatestAvroPayload.class.getName();
case CUSTOM:
default:
return CustomPayloadForTesting.class.getName();
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we get rid of this? The payload should be automatically inferred if needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment on lines +183 to +190
if (RecordMergeMode.valueOf(writeConfigs.get("hoodie.record.merge.mode")).equals(RecordMergeMode.OVERWRITE_WITH_LATEST)) {
recordMergerStrategy = HoodieRecordMerger.OVERWRITE_MERGER_STRATEGY_UUID;
} else if (RecordMergeMode.valueOf(writeConfigs.get("hoodie.record.merge.mode")).equals(RecordMergeMode.EVENT_TIME_ORDERING)) {
recordMergerStrategy = HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID;
} else if (RecordMergeMode.valueOf(writeConfigs.get("hoodie.record.merge.mode")).equals(RecordMergeMode.CUSTOM)) {
//match the behavior of spark for now, but this should be a config
recordMergerStrategy = HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove this part of code.

Copy link
Contributor Author

@jonvex jonvex Aug 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I get rid of it, this is thrown:

Exception:
java.lang.IllegalArgumentException: Record merger strategy (null) should be consistent with the record merging mode OVERWRITE_WITH_LATEST

	at org.apache.hudi.common.util.ValidationUtils.checkArgument(ValidationUtils.java:42)
	at org.apache.hudi.common.table.HoodieTableMetaClient$PropertyBuilder.validateMergeConfigs(HoodieTableMetaClient.java:1470)
	at org.apache.hudi.common.table.HoodieTableMetaClient$PropertyBuilder.build(HoodieTableMetaClient.java:1307)
	at org.apache.hudi.common.table.HoodieTableMetaClient$PropertyBuilder.initTable(HoodieTableMetaClient.java:1412)
	at org.apache.hudi.hadoop.TestHoodieFileGroupReaderOnHive.commitToTable(TestHoodieFileGroupReaderOnHive.java:199)
	at org.apache.hudi.common.table.read.TestHoodieFileGroupReaderBase.testReadFileGroupInMergeOnReadTable(TestHoodieFileGroupReaderBase.java:186)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725)
	at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
	at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
	at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
	at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestTemplateMethod(TimeoutExtension.java:92)
	at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
	at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
	at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
	at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:214)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:210)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:135)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:66)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
	at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
	at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:35)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask$DefaultDynamicTestExecutor.execute(NodeTestTask.java:226)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask$DefaultDynamicTestExecutor.execute(NodeTestTask.java:204)
	at org.junit.jupiter.engine.descriptor.TestTemplateTestDescriptor.execute(TestTemplateTestDescriptor.java:139)
	at org.junit.jupiter.engine.descriptor.TestTemplateTestDescriptor.lambda$execute$2(TestTemplateTestDescriptor.java:107)
	at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
	at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
	at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
	at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
	at java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
	at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:647)
	at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:272)
	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
	at java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
	at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
	at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
	at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
	at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:272)
	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
	at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
	at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
	at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
	at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:272)
	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
	at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
	at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
	at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
	at org.junit.jupiter.engine.descriptor.TestTemplateTestDescriptor.execute(TestTemplateTestDescriptor.java:107)
	at org.junit.jupiter.engine.descriptor.TestTemplateTestDescriptor.execute(TestTemplateTestDescriptor.java:42)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
	at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
	at java.util.ArrayList.forEach(ArrayList.java:1259)
	at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
	at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
	at java.util.ArrayList.forEach(ArrayList.java:1259)
	at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
	at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
	at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:35)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:54)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:107)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)
	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114)
	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:86)
	at org.junit.platform.launcher.core.DefaultLauncherSession$DelegatingLauncher.execute(DefaultLauncherSession.java:86)
	at org.junit.platform.launcher.core.SessionPerRequestLauncher.execute(SessionPerRequestLauncher.java:53)
	at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:57)
	at com.intellij.rt.junit.IdeaTestRunner$Repeater$1.execute(IdeaTestRunner.java:38)
	at com.intellij.rt.execution.junit.TestsRepeater.repeat(TestsRepeater.java:11)
	at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:35)
	at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:232)
	at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:55)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK let's tackle this problem in a separate PR.

Comment on lines 285 to 286
//TODO: validate complex types
continue;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is direct comparison not working?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, It just compares pointers if I do that

@github-actions github-actions bot added size:XL PR with lines of changes > 1000 and removed size:L PR with lines of changes in (300, 1000] labels Aug 12, 2024
@jonvex jonvex requested a review from yihua August 12, 2024 20:44
@github-actions github-actions bot added size:L PR with lines of changes in (300, 1000] and removed size:XL PR with lines of changes > 1000 labels Aug 19, 2024
Comment on lines +183 to +190
if (RecordMergeMode.valueOf(writeConfigs.get("hoodie.record.merge.mode")).equals(RecordMergeMode.OVERWRITE_WITH_LATEST)) {
recordMergerStrategy = HoodieRecordMerger.OVERWRITE_MERGER_STRATEGY_UUID;
} else if (RecordMergeMode.valueOf(writeConfigs.get("hoodie.record.merge.mode")).equals(RecordMergeMode.EVENT_TIME_ORDERING)) {
recordMergerStrategy = HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID;
} else if (RecordMergeMode.valueOf(writeConfigs.get("hoodie.record.merge.mode")).equals(RecordMergeMode.CUSTOM)) {
//match the behavior of spark for now, but this should be a config
recordMergerStrategy = HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK let's tackle this problem in a separate PR.

@jonvex jonvex requested a review from yihua September 10, 2024 18:16
@github-actions github-actions bot added size:XL PR with lines of changes > 1000 and removed size:L PR with lines of changes in (300, 1000] labels Sep 10, 2024
@apache apache deleted a comment from hudi-bot Sep 11, 2024
Copy link
Contributor

@yihua yihua left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

assertInstanceOf(ArrayWritable.class, expected);
assertInstanceOf(ArrayWritable.class, actual);
//adjust for fake partition
int expectedLen = ((ArrayWritable) expected).get().length - (ignoreOneExtraCol ? 1 : 0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the fake partition?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://hudi.apache.org/docs/docker_demo#step-3-sync-with-hive for example, when we do hive sync in the docker demo we set --partitioned-by dt even though dt is not an actual column in the table.

@hudi-bot
Copy link
Collaborator

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@yihua yihua changed the title [HUDI-8003] Add hive overwrite payload [HUDI-8003] Add OverwriteWithLatestHiveRecordMerger and restructure existing merger classes Sep 11, 2024
@yihua yihua merged commit 276133b into apache:master Sep 11, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

size:XL PR with lines of changes > 1000

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants