New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Hoodie Event callbacks #251
Conversation
311522a
to
35d62fe
Compare
35d62fe
to
8f2c88b
Compare
CI failure is due to the race condition bug on master. This PR has been successfully tested on master with the patch given by @n3nash to avoid race temporarily. PR has has also been tested with an ingestion job. |
@@ -102,6 +105,17 @@ public StorageLevel getWriteStatusStorageLevel() { | |||
return StorageLevel.fromString(props.getProperty(WRITE_STATUS_STORAGE_LEVEL)); | |||
} | |||
|
|||
public <T extends WriteStatus> T getWriteStatusInstance() { | |||
try { | |||
Class<? extends WriteStatus> aClass = (Class<? extends WriteStatus>) Class |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use ReflectionUtils to cache the class instance and config is not the right place to instantiate the class. I would do it as a interface method (factory) in WriteStatus interface
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure Prasanna, following is what you mean right?
In HoodieWriteConfig add below
public WriteStatus getWriteStatus() {
return ReflectionUtils.loadClass(props.getProperty(HOODIE_WRITE_STATUS_CLASS_PROP));
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with what prasanna is saying. We can't be actually instantiating the class from the config class. i.e the method here can just return you the class name..
Please move the actual creation of the class i.e the ReflectionUtils call into the Handle classes themselves.
@@ -47,12 +48,14 @@ | |||
private long totalRecords = 0; | |||
private long totalErrorRecords = 0; | |||
|
|||
public void markSuccess(HoodieRecord record) { | |||
public void markSuccess(HoodieRecord record, | |||
Map<String, String> recordMetadata) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make WriteStatus a interface and have a Default implementation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, Im planning to have the default implementation also not use recordMetadata. Please let me know if you prefer having the recordMetadata merging logic in the default impl too.
* This method can be used to fetch metadata regarding the HoodieRecordPayload, for the purpose | ||
* of capturing some metrics. | ||
*/ | ||
Map<String, String> getMetadata(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am a little concerned with having a map of properties on the record payload. Adding data to record payload is very tricky in terms of shuffle. It put in a 100 properties and tuned it for 10 - things may fail miserably during a shuffle. If we need to encode data - why dont we just do it in the InsertRecord. Can you may be give me an example where storing it in the record itself would not make sense?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CMIIW, even if we encode it in the InsertRecord (or HoodieRecordPayload), we need this method to fetch the metadata just before HoodieRecordPayload is deflated like in 8f2c88b#diff-165908ecbfe7dbae538c00e8157e7be0R129
Regarding storing properties in a map - did you recommend storing it in the IndexedRecord instead? My concern was inadvertent stuff being put into IndexRecord affecting parquet schema. But we are planning to do that anyway for across_batch_de-duped counts, so yes, I could actually move these properties into IndexedRecord itself.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC you want a mechanism to pass some data from each record and have it propagated down to a custom WriteStatus class? Since it was mostly you and @prazanna who discussed this and neither the ticket nor the PR describes what the commit is about, @kaushikd49 could you please summarize what the real use-case is ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am okay with this approach @kaushikd49. We may want to enable adding properties to HoodieRecordPayload with a property in HoodieWriteConfig (disabled by default). It may be good idea to write up a small doc on how you are planning to use this map downstream internally and share it with @vinothchandar - just so he has context. Thanks and sorry for the delay
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vinothchandar Yea that is right. I will be sharing a doc with you shortly explaining the use-case.
@prazanna Sure, I'll do that.
@prazanna Thanks a lot for your comments. Had few questions on the comments. Could you please help answer them? |
@kaushikd49 I will take a look and in the meantime, can you please rebase this again off master |
8f2c88b
to
66312b7
Compare
Thanks, done @vinothchandar |
@@ -91,6 +97,14 @@ public String getPartitionPath() { | |||
} | |||
} | |||
|
|||
@Override | |||
public Map<String, String> getMetadata() { | |||
Map<String, String> metadataMap = new HashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you come up with some reasonable values here for the map? Current values don't convey your usage pattern well IMO
@@ -104,7 +105,7 @@ private HoodieWriteConfig makeHoodieClientConfig() throws Exception { | |||
private HoodieWriteConfig.Builder makeHoodieClientConfigBuilder() throws Exception { | |||
// Prepare the AvroParquetIO | |||
String schemaStr = IOUtils.toString(getClass().getResourceAsStream("/exampleSchema.txt"), "UTF-8"); | |||
return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(schemaStr); | |||
return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(schemaStr).withWriteStatusClass(SampleMergeMetadataWriteStatus.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please write a separate test case, which demonstrates how the framework should be used. Lets not change the existing tests with a custom WriteStatus class, as I want to ensure the default usage with WriteStatus.class continues to be exercised.
@@ -57,6 +59,11 @@ public HoodieJsonPayload(String json) throws IOException { | |||
return Optional.of(jsonConverter.convert(getJsonData())); | |||
} | |||
|
|||
@Override |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you push this to the base class, instead of implementing in each subclass
* This method can be used to fetch metadata regarding the HoodieRecordPayload, for the purpose | ||
* of capturing some metrics. | ||
*/ | ||
Map<String, String> getMetadata(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In fact, can you make this an Optional<Map<String, String>>
and always return Optional.empty from the base class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Love some clarification on actual use-case
- Don't have actual instantiation in the HoodieWriteConfig class
- API change & clear documentation for
getMetadata
, and alsomarkSuccess
/markFailure
methods to explain what is passed in, warnings about having a large map in each record etc.. - Leaving existing tests working on
WriteStatus
- Write a separate test case (not a file, just a case) which clearly demonstrates your use-case
@@ -47,12 +48,14 @@ | |||
private long totalRecords = 0; | |||
private long totalErrorRecords = 0; | |||
|
|||
public void markSuccess(HoodieRecord record) { | |||
public void markSuccess(HoodieRecord record, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
javadocs for parameters..
@vinothchandar @prazanna Thank you, will send out an updated PR |
073807f
to
38c134a
Compare
@vinothchandar @prazanna Addressed your comments.
Did not add this property because in the default WriteStatus implementation class (WriteStatusImpl), the metadata map is anyway not used. Also, this adds a dependency to HoodieWriteConfig class from WriteStatus Interface. Open to your suggestions on this. |
@@ -74,7 +73,7 @@ public HoodieAppendHandle(HoodieWriteConfig config, | |||
String fileId, | |||
Iterator<HoodieRecord<T>> recordItr) { | |||
super(config, commitTime, hoodieTable); | |||
WriteStatus writeStatus = new WriteStatus(); | |||
WriteStatus writeStatus = ReflectionUtils.loadClass(config.getWriteStatusClassName()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you re going to rely on a constructor being called via Reflection, then making WriteStatus
an interface is not a necessary thing.. Atleast in the subclassing route, you will be forcing the subclass to implement atleast 1 constructor of the same signature as base class
Was this part of @prazanna 's suggestion earlier on? (given then comment history is nt there now)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vinothchandar Yea, it was part of @prazanna's suggestion earlier. The rebase has hidden the comment. Ok, I'll retain WriteStatus like before as a class.
HoodieWriteConfig cfg = getConfigBuilder().withWriteStatusClass(MetadataMergeWriteStatus.class).build(); | ||
HoodieWriteClient client = new HoodieWriteClient(jsc, cfg); | ||
|
||
/** |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is only 1 write.. please remove comment block.
Generally okay with the change. 1 question on interfacing vs subclassing. Also please ensure you rebased against latest master, since I changed WriteStatus yest |
38c134a
to
0ac219f
Compare
@vinothchandar Done. Yea it is rebased with latest master. |
|
||
/** | ||
* Status of a write operation. | ||
*/ | ||
public class WriteStatus implements Serializable { | ||
|
||
private final HashMap<HoodieKey, Throwable> errors = new HashMap<>(); | ||
private final HashMap<HoodieKey, Throwable> errors = new HashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indenting has changed here throughout.. Can you reformat this file and ensure the only diffs we see here are the two methods? https://uber.github.io/hoodie/code_structure.html
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry about that.
Im using the same code style, and it seems like it is using +2 spaces for indentation. https://google.github.io/styleguide/javaguide.html#s4.2-block-indentation
@@ -238,7 +239,40 @@ public void testUpdateRecords() throws Exception { | |||
return records; | |||
} | |||
|
|||
@Test public void testInsertWithPartialFailures() throws Exception { | |||
// Check if record level metadata is aggregated properly at the end of write. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
watch for indentation here too
@@ -20,6 +20,8 @@ | |||
import com.uber.hoodie.common.model.HoodieRecordPayload; | |||
import com.uber.hoodie.exception.HoodieException; | |||
|
|||
import java.util.Collections; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
extra changes that are not needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cosmetic changes. Should be ready to merge otherwise
0ac219f
to
40b1701
Compare
@vinothchandar Done. |
Addresses #153 (Implement Hoodie Event callbacks)