Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RFC-15][HUDI-1325] Merge updates of unsynced instants to metadata table #2342

Merged
merged 1 commit into from
Dec 28, 2020

Conversation

rmpifer
Copy link
Contributor

@rmpifer rmpifer commented Dec 16, 2020

Tips

What is the purpose of the pull request

Currently every time an operation (i.e. commit, clean, rollback) completes we attempt to keep the metadata table up to date by creating an upsert on the metadata table with the same instant time. This way we can reference both datasets timelines and directly compare the instants on them to see if they are in sync.

There can be the possibility that the dataset timeline and the metadata table timeline become out of sync. When trying to read from the metadata table while the timeline is out of sync you would get incorrect values for getAllFilesInPartition and getAllPartitionPaths.

This change provides a way to overcome this scenario by reading unsynced timeline instants and merging it with existing metadata table records to get the most up to date state of the file system

JIRA: https://issues.apache.org/jira/browse/HUDI-1325

Brief change log

Timeline Sync

  • The logic of converting timeline metadata to metadata table records was directly tied to the commit phase in HoodieBackedMetadataWriter. Refactored this logic to a utility class HoodieTableMetadataTimelineUtil
  • Created a scanner HoodieMetadataMergedInstantRecordScanner which handles conversion of timeline instants to metadata records and merges results
  • Added final step in AbstractHoodieTableMetadata.getMergedRecordByKey which uses the new scanner mentioned to fetch the HoodieRecord associated with the desired key from the unsynced timeline instants and merge it with the record from the metadata table
  • When converting rollback operation to metadata table records there was logic that re-read from the metadata table to ensure any files being deleted as part of roll back existed.
// Rollbacks deletes instants from timeline. The instant being rolled-back may not have been synced to the
// metadata table. Hence, the deleted filed need to be checked against the metadata.

This doesn't make sense since all instants are processed in serial order so there would never be the case where a rollback was being written before an instant earlier on the timeline was already synced. Removed this logic because it created circular dependency when implementing timeline merging

  • Changed the validate metadata step in tests to use the metadata reader HoodieBackedTableMetadata. By default when metadata writer HoodieBackedTableMetadataWriter is initialized it syncs all instants to the metadata table. By using the reader we can simulate metadata table being out of sync.
  • Modified initMetaClient in test base class to allow table type to be passed in since table type is always set as COPY_ON_WRITE if using this method to initialize the meta client

Refactor

For the following reasons I modified the HoodieTableMetadata interface to be an AbstractHoodieMetadata class which contains the following shared functionality irresepective of how the metadata is stored (as a Hoodie table, in some key/value store):

  • Fetching getAllPartitions and getAllFileInPartition from metadata should validate metadata flag is enabled and should default to file listing if any error occurs regardless of storage type
  • During fetchAllPartitions and fetchAllFilesInPartition metrics should be published on operations and if validateLookUps is enabled, results returned from metadata should be compared against actual file listing results regardless of what storage type is
  • In getMergedRecordByKey regardless of how the key is fetched from storage the result is merged against unsynced timeline instants. This is why I introduced abstract getRecordByKeyFromMetadata which can be implemented by inheriting class which contains logic specific to storage type.
  • Moved findInstantsToSync to AbstractHoodieTableMetadata. This method is needed regardless of storage type to find if metadata is in sync with timeline. However since how the last synced instant is stored can be dependent on storage type made this method abstract.

Verify this pull request

(Please pick either of the following options)

This pull request is a trivial rework / code cleanup without any test coverage.

(or)

This pull request is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

Testing

Ran TestHoodieBackedMetadata which contains all tests related to metadata table

Added tests which use an unsynced client after commits, cleans, and restores have been performed to ensure updates not written to the metadata yet are still being reflected when reading from the metadata

Committer checklist

  • Has a corresponding JIRA in PR title & commit

  • Commit message is descriptive of the change

  • CI is green

  • Necessary doc changes done or have another open PR

  • For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.

@vinothchandar
Copy link
Member

@rmpifer could you please rebase this against latest rfc-15 branch. I ll get started with the review in the meantime

Copy link
Member

@vinothchandar vinothchandar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two small structural comments.

I am still mulling if it's alright to just implement this at the scanner level, and not have a wrapper HoodieTableMetadata implementation around it.

@@ -635,9 +635,9 @@ public boolean requireSortedRecords() {
return getBaseFileFormat() == HoodieFileFormat.HFILE;
}

public HoodieTableMetadata metadata() {
public AbstractHoodieTableMetadata metadata() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer to still have an interface for HoodieTableMetadata and return that here. We can have the abstract class internal to the actual implementation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I maintained this interface in revision after rebasing and then built AbstractHoodieTableMetadata on top of this.

/**
* A utility to convert timeline information to metadata table records.
*/
public class HoodieTableMetadataTimelineUtil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can simply call this HoodieTableMetadataUtils ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated this

@rmpifer rmpifer force-pushed the rfc-15 branch 2 times, most recently from de31e8c to 78253ff Compare December 18, 2020 01:19
@vinothchandar
Copy link
Member

@rmpifer On this point specifically,

so there would never be the case where a rollback was being written before an instant earlier on the timeline was already synced. Removed this logic because it created circular dependency when implementing timeline merging

Following situation is technically possible. Timeline has commits c1, c2, c3, c4 and we have synced only upto C2. User decides to restore the table to commit time c2, thus generating rollback metadata for c3, c4 which now have no files in the metadata table. I think we should defensively handle this scenario at the payload merge level i.e if there is a delete for a file that does not exist in the base file or prior instants, we should just do a no-op and continue. What I am saying is : I agree with you that the extra logic need not be present, but we should check the payload once to ensure we can handle the scenario above.

Let me know if we are not talking about the same thing :)

Copy link
Member

@vinothchandar vinothchandar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM overall. Comments on structure, corner case, testing

I think we can stick with the scanner approach. its more elegant than what I had in mind i.e the wrapped HoodieTableMetadata impl

@@ -266,7 +256,7 @@ private void initialize(JavaSparkContext jsc, HoodieTableMetaClient datasetMetaC
}

private void initTableMetadata() {
this.metadata = new HoodieBackedTableMetadata(hadoopConf.get(), datasetWriteConfig.getBasePath(), datasetWriteConfig.getSpillableMapBasePath(),
this.metadata = (HoodieBackedTableMetadata) AbstractHoodieTableMetadata.create(hadoopConf.get(), datasetWriteConfig.getBasePath(), datasetWriteConfig.getSpillableMapBasePath(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can keep the factory method in the interface?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Along with Prashant's comment we can just create HoodieBackedTableMetadata since HoodieBackedTableMetadaWriter should only be associated with this implementation

int[] fileChangeCount = {0, 0}; // deletes, appends

partitionToDeletedFiles.forEach((partition, deletedFiles) -> {
// Rollbacks deletes instants from timeline. The instant being rolled-back may not have been synced to the
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

left a comment around this scenario. there is one valid case here. good call out in the summary

HoodieRecordPayload mergedPayload = timelineHoodieRecord.get().getData().preCombine(metadataHoodieRecord.get().getData());
metadataHoodieRecord = Option.of(new HoodieRecord(metadataHoodieRecord.get().getKey(), mergedPayload));
} else {
metadataHoodieRecord = timelineHoodieRecord;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just return out of here, instead of reassigning to another important variable. may be easier to read.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I created separate variable for this so there is still one place of return

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, my core concern was that we are reassigning to a variable that holds the returned value from a key method call getRecordByKeyFromMetadata

I was suggesting something like.

    Option<HoodieRecord<HoodieMetadataPayload>> metadataHoodieRecord = getRecordByKeyFromMetadata(key);
    // Retrieve record from unsynced timeline instants
    Option<HoodieRecord<HoodieMetadataPayload>> timelineHoodieRecord = timelineRecordScanner.getRecordByKey(key);
    if (timelineHoodieRecord.isPresent()) {
      if (metadataHoodieRecord.isPresent()) {
        HoodieRecordPayload mergedPayload = timelineHoodieRecord.get().getData().preCombine(metadataHoodieRecord.get().getData());
        return Option.of(new HoodieRecord(metadataHoodieRecord.get().getKey(), mergedPayload));
      } else {
        return timelineHoodieRecord;
      }
    }

    return metadataHoodieRecord;

if you feel strongly, we can keep as-is.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry my comment was unclear. I created separate mergedRecord variable so I am not overriding metadataHoodieRecord

@rmpifer
Copy link
Contributor Author

rmpifer commented Dec 21, 2020

@rmpifer On this point specifically,

so there would never be the case where a rollback was being written before an instant earlier on the timeline was already synced. Removed this logic because it created circular dependency when implementing timeline merging

Following situation is technically possible. Timeline has commits c1, c2, c3, c4 and we have synced only upto C2. User decides to restore the table to commit time c2, thus generating rollback metadata for c3, c4 which now have no files in the metadata table. I think we should defensively handle this scenario at the payload merge level i.e if there is a delete for a file that does not exist in the base file or prior instants, we should just do a no-op and continue. What I am saying is : I agree with you that the extra logic need not be present, but we should check the payload once to ensure we can handle the scenario above.

Let me know if we are not talking about the same thing :)

If this is the case what happens when c3 and c4 are then finally synced? These files would remain in the metadata table even though rollback should have removed them

Copy link
Member

@prashantwason prashantwason left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if there is a delete for a file that does not exist in the base file or prior instants, we should just do a no-op and continue.
How do we differentiate this from a bug which lead to files missing from the metadata listing?

The current merge code provides some sort of correctness - deletes are always for files already created. I think we should maintain this.

* @param instant
* @throws IOException
*/
private void processInstant(HoodieInstant instant) throws IOException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can be merged with the above function. I dont see it being called anywhere else.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

@@ -266,7 +256,7 @@ private void initialize(JavaSparkContext jsc, HoodieTableMetaClient datasetMetaC
}

private void initTableMetadata() {
this.metadata = new HoodieBackedTableMetadata(hadoopConf.get(), datasetWriteConfig.getBasePath(), datasetWriteConfig.getSpillableMapBasePath(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reader-write pairs in metadata are related. So this can be simplified to be simply new HoodieBackedTableMetadata.

HoodieBackedTableMetadata cannot work with any other implementaton of HoodieTableMetadata.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. Updated this

@vinothchandar
Copy link
Member

If this is the case what happens when c3 and c4 are then finally synced? These files would remain in the metadata table even though rollback should have removed them

So restore in that example, would delete C3, C4 from the timeline. it won't be synced at all.

How do we differentiate this from a bug which lead to files missing from the metadata listing? The current merge code provides some sort of correctness - deletes are always for files already created. I think we should maintain this.

I understand the need for wanting to fail-fast. but the scenario above is legit, right? Do you suggest we sync the rollbacks only if the original instants for which the rollback was done were synced to the metadata table? cc @prashantwason
Let's make the call on this one. Otherwise PR seems ready more or less, after another rebase/set of comments. ?

Copy link
Contributor Author

@rmpifer rmpifer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is the case what happens when c3 and c4 are then finally synced? These files would remain in the metadata table even though rollback should have removed them

So restore in that example, would delete C3, C4 from the timeline. it won't be synced at all.

How do we differentiate this from a bug which lead to files missing from the metadata listing? The current merge code provides some sort of correctness - deletes are always for files already created. I think we should maintain this.

I understand the need for wanting to fail-fast. but the scenario above is legit, right? Do you suggest we sync the rollbacks only if the original instants for which the rollback was done were synced to the metadata table? cc @prashantwason
Let's make the call on this one. Otherwise PR seems ready more or less, after another rebase/set of comments. ?

If this is the case for restore, that they are removed from the dataset timeline, then having this logic makes sense to me.

If we would like to maintain the requirement that deletes are only for files already created I can add this filtering logic. I like the approach vinoth mentions of rather than filtering at file level, check to see whether instant specified in rollback/restore was synced and skip if not. Let me know your thoughts

@@ -266,7 +256,7 @@ private void initialize(JavaSparkContext jsc, HoodieTableMetaClient datasetMetaC
}

private void initTableMetadata() {
this.metadata = new HoodieBackedTableMetadata(hadoopConf.get(), datasetWriteConfig.getBasePath(), datasetWriteConfig.getSpillableMapBasePath(),
this.metadata = (HoodieBackedTableMetadata) AbstractHoodieTableMetadata.create(hadoopConf.get(), datasetWriteConfig.getBasePath(), datasetWriteConfig.getSpillableMapBasePath(),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Along with Prashant's comment we can just create HoodieBackedTableMetadata since HoodieBackedTableMetadaWriter should only be associated with this implementation

@@ -266,7 +256,7 @@ private void initialize(JavaSparkContext jsc, HoodieTableMetaClient datasetMetaC
}

private void initTableMetadata() {
this.metadata = new HoodieBackedTableMetadata(hadoopConf.get(), datasetWriteConfig.getBasePath(), datasetWriteConfig.getSpillableMapBasePath(),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. Updated this

HoodieRecordPayload mergedPayload = timelineHoodieRecord.get().getData().preCombine(metadataHoodieRecord.get().getData());
metadataHoodieRecord = Option.of(new HoodieRecord(metadataHoodieRecord.get().getKey(), mergedPayload));
} else {
metadataHoodieRecord = timelineHoodieRecord;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry my comment was unclear. I created separate mergedRecord variable so I am not overriding metadataHoodieRecord

* @param instant
* @throws IOException
*/
private void processInstant(HoodieInstant instant) throws IOException {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

@vinothchandar
Copy link
Member

Do you suggest we sync the rollbacks only if the original instants for which the rollback was done were synced to the metadata table?

Thinking out loud whether there are tricky cases here. Specifically, is there a chance of skipping a rollback instant incorrectly.
Let's say we restored the last 11 commits (12 -> 2) with metadata synced upto C8

data timeline : C1,  rollback C12, rollback C11, ..... rollback C2
metadata timeline : C1, C2, ... C8

data timeline archival does not move past C8, but can technically archive C8. Similarly metadata timeline is free to archive independently and lets say it archives upto C2. If it ends up looking like

data timeline : C1,  rollback C12, rollback C11, ..... rollback C2
metadata timeline: C3, ..., C8 

It's now hard to determine whether or not rollback C2 should be applied. (we could compare the latest instant on metadata timeline C8 and assume its applied?)

I think this complexity is unwarranted for sake of failing-fast. We are better of defensively, ignoring deletes for files that were not logged before.

@prashantwason Hope you can be convinced of this :)

@rmpifer
Copy link
Contributor Author

rmpifer commented Dec 23, 2020

data timeline : C1,  rollback C12, rollback C11, ..... rollback C2
metadata timeline: C3, ..., C8 

It's now hard to determine whether or not rollback C2 should be applied. (we could compare the latest instant on metadata timeline C8 and assume its applied?)

Regarding this scenario, my thoughts would be to check rollback metadata only against unsynced instants, i.e. if any instants after C8 were considered in rollback. Is this potentially surfacing a more general issue though. Say metadata timeline has been synced up to C2, but data timeline archival has happened to C8. It seems next time we try to find unsynced instants we won't consider anything between C2...C8

@vinothchandar
Copy link
Member

Regarding this scenario, my thoughts would be to check rollback metadata only against unsynced instants, i.e. if any instants after C8 were considered in rollback.

To be clear, we want to skip syncing a rollback instant to metadata table, if the instant it rolled back is not in completed state on the unsynced portion of the data timeline.? Thats what I meant by (we could compare the latest instant on metadata timeline C8 and assume its applied?). I think it will work out fine. Let's implement this?

Say metadata timeline has been synced up to C2, but data timeline archival has happened to C8.

We have code to guard against this already. Data timeline archiving won't move past latest instant on metadata timeline

// If metadata table is enabled, do not archive instants which are more recent that the latest synced

@vinothchandar
Copy link
Member

@rmpifer @prashantwason lets just implement the check as discussed above and move on. We can file a follow-on JIRA if needed.
I would like merge the feature branch to master asap, to give sometime for testing

@rmpifer
Copy link
Contributor Author

rmpifer commented Dec 28, 2020

Apologies for the delay in pushing changes, was offline over the holidays :). Updated to address feedback and to resolve our discussions to check whether the commit being rolled back is after last sync point on metadata timeline

@vinothchandar
Copy link
Member

@rmpifer no worries. will review today and land

Copy link
Member

@vinothchandar vinothchandar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@vinothchandar vinothchandar merged commit 11661dc into apache:rfc-15 Dec 28, 2020
vinothchandar pushed a commit to vinothchandar/hudi that referenced this pull request Dec 30, 2020
…able (apache#2342)

[RFC-15] Fix partition key in metadata table when bootstrapping from file system (apache#2387)

Co-authored-by: Ryan Pifer <ryanpife@amazon.com>
vinothchandar pushed a commit to vinothchandar/hudi that referenced this pull request Jan 4, 2021
…able (apache#2342)

[RFC-15] Fix partition key in metadata table when bootstrapping from file system (apache#2387)

Co-authored-by: Ryan Pifer <ryanpife@amazon.com>
vinothchandar pushed a commit that referenced this pull request Jan 4, 2021
…able (#2342)

[RFC-15] Fix partition key in metadata table when bootstrapping from file system (#2387)

Co-authored-by: Ryan Pifer <ryanpife@amazon.com>
prashantwason pushed a commit to prashantwason/incubator-hudi that referenced this pull request Feb 22, 2021
…able (apache#2342)

[RFC-15] Fix partition key in metadata table when bootstrapping from file system (apache#2387)

Co-authored-by: Ryan Pifer <ryanpife@amazon.com>
prashantwason pushed a commit to prashantwason/incubator-hudi that referenced this pull request Feb 22, 2021
…pache#2185)

Summary:
[MINOR] Make sure factory method is used to instanciate DFSPathSelector (apache#2187)

* Move createSourceSelector into DFSPathSelector factory method
* Replace constructor call with factory method
* Added some javadoc

[HUDI-1330] handle prefix filtering at directory level (apache#2157)

The current DFSPathSelector only ignore prefix(_, .) at the file level while files under subdirectories
e.g. (.checkpoint/*) are still considered which result in bad-format exception during reading.

[HUDI-1200] fixed NPE in CustomKeyGenerator (apache#2093)

- config field is no longer transient in key generator
- verified that the key generator object is shipped from the driver to executors, just the one time and reused for each record

[HUDI-1209] Properties File must be optional when running deltastreamer (apache#2085)

[MINOR] Fix caller to SparkBulkInsertCommitActionExecutor (apache#2195)

Fixed calling the wrong constructor

[HUDI-1326] Added an API to force publish metrics and flush them. (apache#2152)

* [HUDI-1326] Added an API to force publish metrics and flush them.

Using the added API, publish metrics after each level of the DAG completed in hudi-test-suite.

* Code cleanups

Co-authored-by: Vinoth Chandar <vinoth@apache.org>

[HUDI-1118] Cleanup rollback files residing in .hoodie folder (apache#2205)

[MINOR] Private the NoArgsConstructor of SparkMergeHelper and code clean (apache#2194)

1. Fix merge on read DAG to make docker demo pass (apache#2092)

1. Fix merge on read DAG to make docker demo pass (apache#2092)
2. Fix repeat_count, rollback node

[HUDI-1274] Make hive synchronization supports hourly partition (apache#2122)

[HUDI-1351] Improvements to the hudi test suite for scalability and repeated testing. (apache#2197)

1. Added the --clean-input and --clean-output parameters to clean the input and output directories before starting the job
2. Added the --delete-old-input parameter to deleted older batches for data already ingested. This helps keep number of redundant files low.
3. Added the --input-parallelism parameter to restrict the parallelism when generating input data. This helps keeping the number of generated input files low.
4. Added an option start_offset to Dag Nodes. Without ability to specify start offsets, data is generated into existing partitions. With start offset, DAG can control on which partition, the data is to be written.
5. Fixed generation of records for correct number of partitions
  - In the existing implementation, the partition is chosen as a random long. This does not guarantee exact number of requested partitions to be created.
6. Changed variable blacklistedFields to be a Set as that is faster than List for membership checks.
7. Fixed integer division for Math.ceil. If two integers are divided, the result is not double unless one of the integer is casted to double.

[HUDI-1338] Adding Delete support to test suite framework (apache#2172)

- Adding Delete support to test suite.
         Added DeleteNode
         Added support to generate delete records

Use RateLimiter instead of sleep. Repartition WriteStatus to optimize Hbase index writes (apache#1484)

[HUDI-912] Refactor and relocate KeyGenerator to support more engines (apache#2200)

* [HUDI-912] Refactor and relocate KeyGenerator to support more engines

* Rename KeyGenerators

[HUDI-892] RealtimeParquetInputFormat skip adding projection columns if there are no log files (apache#2190)

* [HUDI-892] RealtimeParquetInputFormat skip adding projection columns if there are no log files
* [HUDI-892]  for test
* [HUDI-892]  fix bug generate array from split
* [HUDI-892] revert test log

[HUDI-1352] Add FileSystemView APIs to query pending clustering operations (apache#2202)

[HUDI-1375] Fix bug in HoodieAvroUtils.removeMetadataFields() method (apache#2232)

Co-authored-by: Wenning Ding <wenningd@amazon.com>

[HUDI-1358] Fix Memory Leak in HoodieLogFormatWriter (apache#2217)

[HUDI-1377] remove duplicate code (apache#2235)

[HUDI-1327] Introduce base implemetation of hudi-flink-client (apache#2176)

[HUDI-1400] Replace Operation enum with WriteOperationType (apache#2259)

[HUDI-1384] Decoupling hive jdbc dependency when HIVE_USE_JDBC_OPT_KEY set false (apache#2241)

[MINOR] clean up and add comments to flink client (apache#2261)

[MINOR] Add apacheflink label (apache#2268)

[HUDI-1393] Add compaction action in archive command (apache#2246)

[HUDI-1364] Add HoodieJavaEngineContext to hudi-java-client (apache#2222)

[HUDI-1396] Fix for preventing bootstrap datasource jobs from hanging via spark-submit (apache#2253)

Co-authored-by: Wenning Ding <wenningd@amazon.com>

[HUDI-1358] Fix leaks in DiskBasedMap and LazyFileIterable (apache#2249)

[HUDI-1392] lose partition info when using spark parameter basePath (apache#2243)

Co-authored-by: zhang wen <wen.zhang@dmall.com>

[MINOR] refactor code in HoodieMergeHandle (apache#2272)

[HUDI-1424]  Write Type changed to BULK_INSERT when set ENABLE_ROW_WRITER_OPT_KEY=true (apache#2289)

[HUDI-1373] Add Support for OpenJ9 JVM (apache#2231)

* add supoort for OpenJ9 VM
* add 32bit openJ9
* Pulled the memory layout specs into their own classes.

[HUDI-1357] Added a check to validate records are not lost during merges. (apache#2216)

- Turned off by default

[HUDI-1196] Update HoodieKey when deduplicating records with global index (apache#2248)

- Works only for overwrite payload (default)
- Does not alter current semantics otherwise

Co-authored-by: Ryan Pifer <ryanpife@amazon.com>

[HUDI-1349] spark sql support overwrite use insert_overwrite_table (apache#2196)

[HUDI-1343] Add standard schema postprocessor which would rewrite the schema using spark-avro conversion (apache#2192)

Co-authored-by: liujh <liujh@t3go.cn>

[HUDI-1427] Fix FileAlreadyExistsException when set HOODIE_AUTO_COMMIT_PROP to true (apache#2295)

[HUDI-1412] Make HoodieWriteConfig support setting different default … (apache#2278)

* [HUDI-1412] Make HoodieWriteConfig support setting different default value according to engine type

fix typo (apache#2308)

Co-authored-by: Xi Chen <chenxi07@qiyi.com>

[HUDI-1040] Make Hudi support Spark 3 (apache#2208)

* Fix flaky MOR unit test

* Update Spark APIs to make it be compatible with both spark2 & spark3

* Refactor bulk insert v2 part to make Hudi be able to compile with Spark3

* Add spark3 profile to handle fasterxml & spark version

* Create hudi-spark-common module & refactor hudi-spark related modules

Co-authored-by: Wenning Ding <wenningd@amazon.com>

[MINOR] Throw an exception when keyGenerator initialization failed (apache#2307)

[HUDI-1395] Fix partition path using FSUtils (apache#2312)

Fixed the logic to get partition path in Copier and Exporter utilities.

[HUDI-1445] Refactor AbstractHoodieLogRecordScanner to use Builder (apache#2313)

[MINOR] Minor improve in IncrementalRelation (apache#2314)

[HUDI-1439] Remove scala dependency from hudi-client-common (apache#2306)

[HUDI-1428] Clean old fileslice is invalid (apache#2292)

Co-authored-by: zhang wen <wen.zhang@dmall.com>
Co-authored-by: zhang wen <steven@stevendeMac-mini.local>

[HUDI-1448]  Hudi dla sync support skip rt table syncing (apache#2324)

[HUDI-1435] Fix bug in Marker File Reconciliation for Non-Partitioned datasets (apache#2301)

[MINOR] Improve code readability by passing in the fileComparisonsRDD in bloom index (apache#2319)

[HUDI-1376] Drop Hudi metadata cols at the beginning of Spark datasource writing (apache#2233)

Co-authored-by: Wenning Ding <wenningd@amazon.com>

[MINOR] Fix error information in exception (apache#2341)

[MINOR] Make QuickstartUtil generate random timestamp instead of 0 (apache#2340)

[HUDI-1406] Add date partition based source input selector for Delta streamer (apache#2264)

- Adds ability to list only recent date based partitions from source data.
- Parallelizes listing for faster tailing of DFSSources

[HUDI-1437]  support more accurate spark JobGroup for better performance tracking (apache#2322)

[HUDI-1470] Use the latest writer schema, when reading from existing parquet files in the hudi-test-suite (apache#2344)

[HUDI-115] Adding DefaultHoodieRecordPayload to honor ordering with combineAndGetUpdateValue (apache#2311)

* Added ability to pass in `properties` to payload methods, so they can perform table/record specific merges
* Added default methods so existing payload classes are backwards compatible.
* Adding DefaultHoodiePayload to honor ordering while merging two records
* Fixing default payload based on feedback

[HUDI-1419] Add base implementation for hudi java client (apache#2286)

[MINOR] Pass root exception to HoodieKeyGeneratorException for more information (apache#2354)

Co-authored-by: Xi Chen <chenxi07@qiyi.com>

[HUDI-1075] Implement simple clustering strategies to create ClusteringPlan and to run the plan

[HUDI-1471] Make QuickStartUtils generate deletes according to specific ts (apache#2357)

[HUDI-1485] Fix Deletes issued without any prior commits exception (apache#2361)

[HUDI-1488] Fix Test Case Failure in TestHBaseIndex (apache#2365)

[HUDI-1489] Fix null pointer exception when reading updated written bootstrap table (apache#2370)

Co-authored-by: Wenning Ding <wenningd@amazon.com>

[HUDI-1451] Support bulk insert v2 with Spark 3.0.0 (apache#2328)

Co-authored-by: Wenning Ding <wenningd@amazon.com>

- Added support for bulk insert v2 with datasource v2 api in Spark 3.0.0.

[HUDI-1487] fix unit test testCopyOnWriteStorage random failed (apache#2364)

[HUDI-1490] Incremental Query should work even when there are  partitions that have no incremental changes (apache#2371)

* Incremental Query should work even when there are  partitions that have no incremental changes

Co-authored-by: Sivabalan Narayanan <sivabala@uber.com>

[HUDI-1331] Adding support for validating entire dataset and long running tests in test suite framework (apache#2168)

* trigger rebuild

* [HUDI-1156] Remove unused dependencies from HoodieDeltaStreamerWrapper Class (apache#1927)

* Adding support for validating records and long running tests in test sutie framework

* Adding partial validate node

* Fixing spark session initiation in Validate nodes

* Fixing validation

* Adding hive table validation to ValidateDatasetNode

* Rebasing with latest commits from master

* Addressing feedback

* Addressing comments

Co-authored-by: lamber-ken <lamberken@163.com>
Co-authored-by: linshan-ma <mabin194046@163.com>

[HUDI-1481]  add  structured streaming and delta streamer clustering unit test (apache#2360)

[HUDI-1354] Block updates and replace on file groups in clustering (apache#2275)

* [HUDI-1354] Block updates and replace on file groups in clustering

* [HUDI-1354]  Block updates and replace on file groups in clustering

[HUDI-1350] Support Partition level delete API in HUDI (apache#2254)

* [HUDI-1350] Support Partition level delete API in HUDI

* [HUDI-1350] Support Partition level delete API in HUDI base InsertOverwriteCommitAction

* [HUDI-1350] Support Partition level delete API in HUDI base InsertOverwriteCommitAction

[HUDI-1495] Upgrade Flink version to 1.12.0 (apache#2384)

[MINOR] Remove the duplicate code in AbstractHoodieWriteClient.startCommit (apache#2385)

[HUDI-1398] Align insert file size for reducing IO (apache#2256)

* [HUDI-1398] Align insert file size for reducing IO

Co-authored-by: zhang wen <wen.zhang@dmall.com>

[HUDI-1484] Escape the partition value in HiveSyncTool (apache#2363)

[HUDI-1474] Add additional unit tests to TestHBaseIndex (apache#2349)

[HUDI-1147] Modify GenericRecordFullPayloadGenerator to generate vali… (apache#2045)

* [HUDI-1147] Modify GenericRecordFullPayloadGenerator to generate valid timestamps

Co-authored-by: Sivabalan Narayanan <sivabala@uber.com>

[HUDI-1493] Fixed schema compatibility check for fields. (apache#2350)

Some field types changes are allowed (e.g. int -> long) while maintaining schema backward compatibility within HUDI. The check was reversed with the reader schema being passed for the write schema.

[MINOR] Update report_coverage.sh (apache#2396)

[HUDI-1434] fix incorrect log file path in HoodieWriteStat (apache#2300)

* [HUDI-1434] fix incorrect log file path in HoodieWriteStat

* HoodieWriteHandle#close() returns a list of WriteStatus objs

* Handle rolled-over log files and return a WriteStatus per log file written

 - Combined data and delete block logging into a single call
 - Lazily initialize and manage write status based on returned AppendResult
 - Use FSUtils.getFileSize() to set final file size, consistent with other handles
 - Added tests around returned values in AppendResult
 - Added validation of the file sizes returned in write stat

Co-authored-by: Vinoth Chandar <vinoth@apache.org>

[HUDI-1418] Set up flink client unit test infra (apache#2281)

[MINOR] Sync UpsertPartitioner modify of HUDI-1398 to flink/java (apache#2390)

Co-authored-by: zhang wen <wen.zhang@dmall.com>

[HUDI-1423] Support delete in hudi-java-client (apache#2353)

[MINOR] Add maven profile to support skipping shade sources jars (apache#2358)

Co-authored-by: Xi Chen <chenxi07@qiyi.com>

[HUDI-842] Implementation of HUDI RFC-15.

 - Introduced an internal metadata table, that stores file listings.
 - metadata table is kept upto date with
 - Fixed handling of CleanerPlan.
 - [HUDI-842] Reduce parallelism to speed up the test.
 - [HUDI-842] Implementation of CLI commands for metadata operations and lookups.
 - [HUDI-842] Extend rollback metadata to include the files which have been appended to.
 - [HUDI-842] Support for rollbacks in MOR Table.
 - MarkerBasedRollbackStrategy needs to correctly provide the list of files for which rollback blocks were appended.
 - [HUDI-842] Added unit test for rollback of partial commits (inflight but not completed yet).
 - [HUDI-842] Handled the error case where metadata update succeeds but dataset commit fails.
 - [HUDI-842] Schema evolution strategy for Metadata Table. Each type of metadata saved (FilesystemMetadata, ColumnIndexMetadata, etc.) will be a separate field with default null. The type of the record will identify the valid field. This way, we can grow the schema when new type of information is saved within in which still keeping it backward compatible.
 - [HUDI-842] Fix non-partitioned case and speedup initial creation of metadata table.Choose only 1 partition for jsc as the number of records is low (hundreds to thousands). There is more overhead of creating large number of partitions for JavaRDD and it slows down operations like WorkloadProfile.
For the non-partitioned case, use "." as the name of the partition to prevent empty keys in HFile.
 - [HUDI-842] Reworked metrics pusblishing.
 - Code has been split into reader and writer side. HoodieMetadata code to be accessed by using HoodieTable.metadata() to get instance of metdata for the table.
Code is serializable to allow executors to use the functionality.
 - [RFC-15] Add metrics to track the time for each file system call.
 - [RFC-15] Added a distributed metrics registry for spark which can be used to collect metrics from executors. This helps create a stats dashboard which shows the metadata table improvements in real-time for production tables.
 - [HUDI-1321] Created HoodieMetadataConfig to specify configuration for the metadata table. This is safer than full-fledged properties for the metadata table (like HoodieWriteConfig) as it makes burdensome to tune the metadata. With limited configuration, we can control the performance of the metadata table closely.

[HUDI-1319][RFC-15] Adding interfaces for HoodieMetadata, HoodieMetadataWriter (apache#2266)
 - moved MetadataReader to HoodieBackedTableMetadata, under the HoodieTableMetadata interface
 - moved MetadataWriter to HoodieBackedTableMetadataWriter, under the HoodieTableMetadataWriter
 - Pulled all the metrics into HoodieMetadataMetrics
 - Writer now wraps the metadata, instead of extending it
 - New enum for MetadataPartitionType
 - Streamlined code flow inside HoodieBackedTableMetadataWriter w.r.t initializing metadata state
 - [HUDI-1319] Make async operations work with metadata table (apache#2332)
 - Changes the syncing model to only move over completed instants on data timeline
 - Syncing happens postCommit and on writeClient initialization
 - Latest delta commit on the metadata table is sufficient as the watermark for data timeline archival
 - Cleaning/Compaction use a suffix to the last instant written to metadata table, such that we keep the 1-1
 - .. mapping between data and metadata timelines.
 - Got rid of a lot of the complexity around checking for valid commits during open of base/log files
 - Tests now use local FS, to simulate more failure scenarios
 - Some failure scenarios exposed HUDI-1434, which is needed for MOR to work correctly

co-authored by: Vinoth Chandar <vinoth@apache.org>

[HUDI-1450] Use metadata table for listing in HoodieROTablePathFilter (apache#2326)

[HUDI-1394] [RFC-15] Use metadata table (if present) to get all partition paths (apache#2351)

[HUDI-1469] Faster initialization of metadata table using parallelized listing. (apache#2343)

 * [HUDI-1469] Faster initialization of metadata table using parallelized listing which finds partitions and files in a single scan.
 * MINOR fixes

Co-authored-by: Vinoth Chandar <vinoth@apache.org>

[HUDI-1325] [RFC-15] Merge updates of unsynced instants to metadata table (apache#2342)

[RFC-15] Fix partition key in metadata table when bootstrapping from file system (apache#2387)

Co-authored-by: Ryan Pifer <ryanpife@amazon.com>

[HUDI-1312] [RFC-15] Support for metadata listing for snapshot queries through Hive/SparkSQL (apache#2366)

Co-authored-by: Ryan Pifer <ryanpife@amazon.com>

[HUDI-1504] Allow log files generated during restore/rollback to be synced as well

 - TestHoodieBackedMetadata#testSync etc now run for MOR tables
 - HUDI-1502 is still pending and has issues for MOR/rollbacks
 - Also addressed bunch of code review comments.

[HUDI-1498] Read clustering plan from requested file for inflight instant (apache#2389)

[HUDI-1506] Fix wrong exception thrown in HoodieAvroUtils (apache#2405)

[HUDI-1383] Fixing sorting of partition vals for hive sync computation (apache#2402)

[HUDI-1507] Change timeline utils to support reading replacecommit metadata (apache#2407)

[MINOR] Rename unit test package of hudi-spark3 from scala to java (apache#2411)

[HUDI-1513] Introduce WriteClient#preWrite() and relocate metadata table syncing (apache#2413)

- Syncing to metadata table, setting operation type, starting async cleaner done in preWrite()
 - Fixes an issues where delete() was not starting async cleaner correctly
 - Fixed tests and enabled metadata table for TestAsyncCompaction

[HUDI-1510] Move HoodieEngineContext and its dependencies to hudi-common (apache#2410)

[MINOR] Sync HUDI-1196 to  FlinkWriteHelper (apache#2415)

[HUDI-1514] Avoid raw type use for parameter of Transformer interface (apache#2420)

[HUDI-920] Support Incremental query for MOR table (apache#1938)

[HUDI-1276] [HUDI-1459] Make Clustering/ReplaceCommit and Metadata table be compatible (apache#2422)

* [HUDI-1276] [HUDI-1459] Make Clustering/ReplaceCommit and Metadata table be compatible

* Use filesystemview and json format from metadata. Add tests

Co-authored-by: Satish Kotha <satishkotha@uber.com>

[HUDI-1399] support a independent clustering spark job to asynchronously clustering (apache#2379)

* [HUDI-1481]  add  structured streaming and delta streamer clustering unit test

* [HUDI-1399] support a independent clustering spark job to asynchronously clustering

* [HUDI-1399]  support a  independent clustering spark job to asynchronously clustering

* [HUDI-1498] Read clustering plan from requested file for inflight instant (apache#2389)

* [HUDI-1399]  support  a independent clustering spark job with schedule generate instant time

Co-authored-by: satishkotha <satishkotha@uber.com>

[MINOR] fix spark 3 build for incremental query on MOR (apache#2425)

[HUDI-1479] Use HoodieEngineContext to parallelize fetching of partiton paths (apache#2417)

* [HUDI-1479] Use HoodieEngineContext to parallelize fetching of partition paths

* Adding testClass for FileSystemBackedTableMetadata

Co-authored-by: Nishith Agarwal <nagarwal@uber.com>

[HUDI-1520] add configure for spark sql overwrite use INSERT_OVERWRITE_TABLE (apache#2428)

[HUDI-1502] MOR rollback and restore support for metadata sync (apache#2421)

- Adds field to RollbackMetadata that capture the logs written for rollback blocks
- Adds field to RollbackMetadata that capture new logs files written by unsynced deltacommits

Co-authored-by: Vinoth Chandar <vinoth@apache.org>

Reviewers: O955 Project Hoodie Project Reviewer: Add blocking reviewers!, PHID-PROJ-pxfpotkfgkanblb3detq!, #ldap_hudi, modi

Reviewed By: #ldap_hudi, modi

Differential Revision: https://code.uberinternal.com/D5347141
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants