-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[HUDI-2176, 2178, 2179] Adding virtual key support to COW table #3306
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some notes to reviewer
@@ -278,7 +291,8 @@ protected boolean writeRecord(HoodieRecord<T> hoodieRecord, Option<IndexedRecord | |||
* Go through an old record. Here if we detect a newer version shows up, we write the new one to the file. | |||
*/ | |||
public void write(GenericRecord oldRecord) { | |||
String key = oldRecord.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(); | |||
String key = populateMetaFields ? oldRecord.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString() : |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure if we need to abstract this out and keep it outside of MergeHandle itself. there is only two options. Either use meta cols or use keyGen to compute record keys. So, have decided to manage it here itself.
prepRecordWithMetadata(avroRecord, record, instantTime, | ||
taskContextSupplier.getPartitionIdSupplier().get(), recordIndex, file.getName()); | ||
super.write(avroRecord); | ||
writeSupport.add(record.getRecordKey()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as of this patch, I assume boom goes hand in hand w/ meta cols. If populateMetaCols is false, we are not adding bloom index. We can add follow up patches to de-couple these.
@@ -225,6 +229,22 @@ protected void initMetaClient(HoodieTableType tableType) throws IOException { | |||
metaClient = HoodieTestUtils.init(hadoopConf, basePath, tableType); | |||
} | |||
|
|||
protected Properties getPropertiesForKeyGen() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
HoodieTestDataGenerator has these fields in the commonly used schema and hence hardcoded it here, so that all tests can call into this to generate props.
* @return {@link List} of {@link HoodieKey}s fetched from the parquet file | ||
*/ | ||
@Override | ||
public List<HoodieKey> fetchRecordKeyPartitionPath(Configuration configuration, Path filePath, BaseKeyGenerator keyGenerator) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure if we can add another argument to existing api and generate/fetch recordKeys and partition path based on that. Felt this is neat.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably. but this method has a lot of code duplication atm. can we reduce that
@@ -1588,6 +1588,11 @@ public Builder withWriteMetaKeyPrefixes(String writeMetaKeyPrefixes) { | |||
return this; | |||
} | |||
|
|||
public Builder withPopulateMetaFields(boolean populateMetaFields) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please clarify me something. if we wish to store the property in hoodie.properties and is part of HoodieTableConfig, then we should set the config via HoodieWriteConfigBuilder.withHoodieTableConfig(new HoodieTableConfigBuilder().withPopulate ... sort of? and exposing setter here in HoodieWriteConfig is not the right way to go about?
It was easier for me in tests to set this param. but wanted to know whats the right way to go about.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah. probably composing it like that is the right away. Separate out the table configs from the write configs.
HoodieAvroWriteSupport writeSupport = | ||
new HoodieAvroWriteSupport(new AvroSchemaConverter(hoodieTable.getHadoopConf()).convert(schema), schema, filter); | ||
TaskContextSupplier taskContextSupplier, boolean populateMetaFields, boolean enableBloomFilter) throws IOException { | ||
BloomFilter filter = enableBloomFilter ? createBloomFilter(config) : null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Already HoodieAvroWriteSupport handles null bloom Filter and hence using null here. If you prefer, I can change that to Option and fix this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, let's fix this to not do nulls, if its not a lot of change
Codecov Report
@@ Coverage Diff @@
## master #3306 +/- ##
=============================================
- Coverage 47.74% 27.76% -19.99%
+ Complexity 5591 1330 -4261
=============================================
Files 938 386 -552
Lines 41823 15582 -26241
Branches 4213 1390 -2823
=============================================
- Hits 19968 4326 -15642
+ Misses 20070 10932 -9138
+ Partials 1785 324 -1461
Flags with carried forward coverage won't be shown. Click here to find out more.
Continue to review full report at Codecov.
|
901081a
to
2c243ca
Compare
@vinothchandar : you are good to review this patch. |
@@ -101,33 +103,44 @@ | |||
protected long updatedRecordsWritten = 0; | |||
protected long insertRecordsWritten = 0; | |||
protected boolean useWriterSchema; | |||
protected boolean populateMetaFields; | |||
protected Option<BaseKeyGenerator> keyGeneratorOpt; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I initially thought of moving this to HoodieWriteHandle thinking all handles might need this. but looks like neither create nor append handle needs key gen since on the write path, we have HoodieKey handy.
@@ -90,6 +90,7 @@ | |||
public static final int DEFAULT_PARTITION_DEPTH = 3; | |||
public static final String TRIP_SCHEMA_PREFIX = "{\"type\": \"record\"," + "\"name\": \"triprec\"," + "\"fields\": [ " | |||
+ "{\"name\": \"timestamp\",\"type\": \"long\"}," + "{\"name\": \"_row_key\", \"type\": \"string\"}," | |||
+ "{\"name\": \"partition_path\", \"type\": \"string\"}," |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like we didn't have any field in the schema to hold the partition path value only. We always generate and hold it within HoodieKey which eventually goes into meta fields. Hence had to add this field.
719bb10
to
9674330
Compare
There was a PR around this before as well. Can we create a blocker JIRA for 0.10.0 that picks all configs like that and ensure the validations are in place |
Can we file a umbrella JIRA to track all this. move all these issues as well into it.
|
9674330
to
4970e91
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LG overall. Lots of code comments.
@@ -1588,6 +1588,11 @@ public Builder withWriteMetaKeyPrefixes(String writeMetaKeyPrefixes) { | |||
return this; | |||
} | |||
|
|||
public Builder withPopulateMetaFields(boolean populateMetaFields) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah. probably composing it like that is the right away. Separate out the table configs from the write configs.
hoodieTable.getHadoopConf(), new Path(baseFile.getPath())).stream() | ||
.map(entry -> Pair.of(entry, | ||
new HoodieRecordLocation(baseFile.getCommitTime(), baseFile.getFileId()))); | ||
if (config.populateMetaFields()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you already decide in a upper layer to pass in Option.empty if config.popularMetaFields() == true
right? In these cases, it advisable to just use keyGeneratorOpt.map(keyGen -> /* else block call */).orElse(/* if block*/)
and not rely on checking the config again and again
} else { | ||
return BaseFileUtils.getInstance(baseFile.getPath()).fetchRecordKeyPartitionPath( | ||
hoodieTable.getHadoopConf(), new Path(baseFile.getPath()), keyGeneratorOpt.get()).stream() | ||
.map(entry -> Pair.of(entry, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we avoid repeating lines 63, 58.
super(config, instantTime, partitionPath, fileId, hoodieTable, taskContextSupplier); | ||
init(fileId, recordItr); | ||
init(fileId, partitionPath, baseFile); | ||
this.populateMetaFields = config.populateMetaFields(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same question: can we just work off keyGeneratorOpt.isEmpty()
setAndValidateKeyGenProps(keyGeneratorOpt); | ||
} | ||
|
||
private void setAndValidateKeyGenProps(Option<BaseKeyGenerator> keyGeneratorOpt) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
validate and then set?
} | ||
|
||
private void initKeyGenIfNeeded() { | ||
this.populateMetaFields = config.populateMetaFields(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move this to constructor?
try { | ||
keyGeneratorOpt = Option.of((BaseKeyGenerator) HoodieSparkKeyGeneratorFactory.createKeyGenerator(new TypedProperties(config.getProps()))); | ||
} catch (IOException e) { | ||
throw new HoodieIOException("Only BaseKeyGenerators are supported when meta columns are disabled ", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move this exception handling into the method itself? its an unchecked exception anyway. we can save some lines
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not very sure on this. Did you mean to suggest to move this to the caller of this method or within HoodieSparkKeyGeneratorFactory.createKeyGenerator.
Bcoz, we have two constructors and to avoid duplicate code, I am doing this in private method.
Also, we can't move it within HoodieSparkKeyGeneratorFactory.createKeyGenerator, bcoz, here we are casting it to BaseKeyGenerator since, if meta fields are disabled, we have some constraints around keygens.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
within HoodieSparkKeyGeneratorFactory.createKeyGenerator.
yes. within. You can still cast outside right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But I added as a guard for catching casting issues.
/** | ||
* Fetch schema for record key and partition path. | ||
*/ | ||
public static Schema getRecordKeyPartitionPathSchema(Schema fileSchema, List<String> recordKeyFields, List<String> partitionPathFields) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unit test for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any reason why we can't just merge the lists outside and keep this method simpler. i.e take a list of fields and get a subschema? in fact, we may have a method like that already, that we can reuse around.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks like we don't have one already. But will fix this method to be generic.
public List<HoodieKey> fetchRecordKeyPartitionPath(Configuration configuration, Path filePath, BaseKeyGenerator keyGenerator) { | ||
List<HoodieKey> hoodieKeys = new ArrayList<>(); | ||
try { | ||
if (!filePath.getFileSystem(configuration).exists(filePath)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let avoid this call? and have it error out if does not exist.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
existing fetchRecordKeyPartitionPath() already does this. I assume you suggested to fix all.
* @return {@link List} of {@link HoodieKey}s fetched from the parquet file | ||
*/ | ||
@Override | ||
public List<HoodieKey> fetchRecordKeyPartitionPath(Configuration configuration, Path filePath, BaseKeyGenerator keyGenerator) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably. but this method has a lot of code duplication atm. can we reduce that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will address the feedback
7b789d7
to
8a0661b
Compare
8a0661b
to
77fc978
Compare
What is the purpose of the pull request
Changes not covered in this PR:
To discuss:
With virtual keys, we are imposing a constraint that keyGen for a given table cannot change from the time of its inception. So, given this constraint, should we add some validation in HoodieSparkSqlWriter or WriteClient so that keyGen does not change overtime for a given table?
Reason I am asking is, even today we have some loose ends. for eg, if someone switches index type mid way, don't think we validate and throw proper exception that index type can't be changed to incompatible ones. Not meaning to say, we have to follow this. But just a reminder.
Brief change log
Verify this pull request
This change added tests and can be verified as follows:
Committer checklist
Has a corresponding JIRA in PR title & commit
Commit message is descriptive of the change
CI is green
Necessary doc changes done or have another open PR
For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.