Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-1160] Support update partial fields for CoW table #2386

Closed
wants to merge 5 commits into from
Closed

[HUDI-1160] Support update partial fields for CoW table #2386

wants to merge 5 commits into from

Conversation

liujinhui1994
Copy link
Contributor

Tips

What is the purpose of the pull request

Support update partial fields for Cow table

Brief change log

Support update partial fields for Cow table, currently, when updating hudi table, users have to provide all fields, however in some cases, users would not provide all fields, and hudi needs to support update partial fields.

Introduce PartialUpdatePayload

Verify this pull request

Add UTs

Committer checklist

  • Has a corresponding JIRA in PR title & commit

  • Commit message is descriptive of the change

  • CI is green

  • Necessary doc changes done or have another open PR

  • For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.

@codecov-io
Copy link

codecov-io commented Dec 28, 2020

Codecov Report

Merging #2386 (8b6aa52) into master (6cdf59d) will decrease coverage by 42.18%.
The diff coverage is n/a.

Impacted file tree graph

@@              Coverage Diff              @@
##             master    #2386       +/-   ##
=============================================
- Coverage     52.23%   10.04%   -42.19%     
+ Complexity     2662       48     -2614     
=============================================
  Files           335       52      -283     
  Lines         14981     1852    -13129     
  Branches       1506      223     -1283     
=============================================
- Hits           7825      186     -7639     
+ Misses         6533     1653     -4880     
+ Partials        623       13      -610     
Flag Coverage Δ Complexity Δ
hudicli ? ?
hudiclient 100.00% <ø> (ø) 0.00 <ø> (ø)
hudicommon ? ?
hudihadoopmr ? ?
huditimelineservice ? ?
hudiutilities 10.04% <ø> (-59.62%) 0.00 <ø> (ø)

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ Complexity Δ
...va/org/apache/hudi/utilities/IdentitySplitter.java 0.00% <0.00%> (-100.00%) 0.00% <0.00%> (-2.00%)
...va/org/apache/hudi/utilities/schema/SchemaSet.java 0.00% <0.00%> (-100.00%) 0.00% <0.00%> (-3.00%)
...a/org/apache/hudi/utilities/sources/RowSource.java 0.00% <0.00%> (-100.00%) 0.00% <0.00%> (-4.00%)
.../org/apache/hudi/utilities/sources/AvroSource.java 0.00% <0.00%> (-100.00%) 0.00% <0.00%> (-1.00%)
.../org/apache/hudi/utilities/sources/JsonSource.java 0.00% <0.00%> (-100.00%) 0.00% <0.00%> (-1.00%)
...rg/apache/hudi/utilities/sources/CsvDFSSource.java 0.00% <0.00%> (-100.00%) 0.00% <0.00%> (-10.00%)
...g/apache/hudi/utilities/sources/JsonDFSSource.java 0.00% <0.00%> (-100.00%) 0.00% <0.00%> (-4.00%)
...apache/hudi/utilities/sources/JsonKafkaSource.java 0.00% <0.00%> (-100.00%) 0.00% <0.00%> (-6.00%)
...pache/hudi/utilities/sources/ParquetDFSSource.java 0.00% <0.00%> (-100.00%) 0.00% <0.00%> (-5.00%)
...lities/schema/SchemaProviderWithPostProcessor.java 0.00% <0.00%> (-100.00%) 0.00% <0.00%> (-4.00%)
... and 312 more

FIX test checkStyle
@liujinhui1994
Copy link
Contributor Author

@leesf please review ,thanks

Comment on lines +67 to +70
List<HoodieRecord> inserts2 =
dataGen.generatePartialUpdateInsertsStream(commitTime2, 100, false, PARTIAL_TRIP_SCHEMA, false).collect(Collectors.toList()); // this writes ~500kb
List<HoodieKey> insertKeys2 = recordsToHoodieKeys(inserts2);
WriteStatus writeStatus = upsertAndCheck(client, insertKeys2, commitTime2, true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if users use this payload, and adding some fields in commits, what will happen? would you please add a test for this case?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For MOR table, and any other changes to make MOR partial update works?

dataGen.generatePartialUpdateInsertsStream(commitTime1, 100, false, TRIP_SCHEMA, false).collect(Collectors.toList()); // this writes ~500kb
List<HoodieKey> insertKeys1 = recordsToHoodieKeys(inserts1);
upsertAndCheck(client, insertKeys1, commitTime1, false);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this line

List<String> oldSchemaFieldNames = AVRO_TRIP_SCHEMA.getFields().stream().map(Schema.Field::name).collect(Collectors.toList());
List<String> parquetFieldNames = schema.getFields().stream().map(Schema.Field::name).collect(Collectors.toList());


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

@@ -461,6 +494,11 @@ public static void createSavepointFile(String basePath, String instantTime, Conf
() -> UUID.randomUUID().toString());
}

public Stream<HoodieRecord> generatePartialUpdateInsertsStream(String commitTime, Integer n, boolean isFlattened, String schemaStr, boolean containsAllPartitions) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please split the parameters into two lines.

@@ -486,6 +524,28 @@ public static void createSavepointFile(String basePath, String instantTime, Conf
});
}

public Stream<HoodieRecord> generatePartialUpdateInsertsStream(String instantTime, Integer n, boolean isFlattened, String schemaStr, boolean containsAllPartitions,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

+ "{\"name\":\"driver\",\"type\":\"string\"},{\"name\":\"fare\",\"type\":\"double\"},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", "
+ "\"default\": false}]}";

public static final String PARTIAL_TRIP_SCHEMA = "{\"type\":\"record\",\"name\":\"tripUberRec\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"_row_key\",\"type\":\"string\"},"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

split into two lines.

@Karl-WangSK
Copy link
Contributor

hi @liujinhui1994 ,a liitle concern.
The schema In SparkLazyInsertIterable.computeNext doesn't change if use partial update, will this still work ?

@liujinhui1994
Copy link
Contributor Author

@Karl-WangSK Can you describe it in detail?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants