Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-3963] Use Lock-Free Message Queue Disruptor Improving Hoodie Writing Efficiency #5416

Merged
merged 58 commits into from
Nov 3, 2022

Conversation

zhangyue19921010
Copy link
Contributor

@zhangyue19921010 zhangyue19921010 commented Apr 24, 2022

https://issues.apache.org/jira/browse/HUDI-3963
RFC design : #5567

Change Logs

Abstract

Add Lock-Free executor to improve hoodie writing throughput and optimize execution efficiency.
Disruptor linked: https://lmax-exchange.github.io/disruptor/user-guide/index.html#_introduction

Background

Abstractly, hoodie consumes upstream data (Kafka or S3 files) into the lake is a standard production-consumption model. Currently, Hudi uses LinkedBlockingQueue as a message queue between Producer and Consumer.

However, this lock model may become the bottleneck of application throughput when data volume is much larger. What's worse is that even if we increase the number of the executors, it is still difficult to improve the throughput.

In other words, users may encounter throughput bottlenecks when writing data into hudi in some scenarios, and increasing the physical hardware scale and parameter tuning cannot solve this problem.

Implementation

  1. Abstraction: first abstract the produce-consume process into three parts
  • Producer interface: Fill the hoodie data into the inner message queue.
  • Executor interface: Initialize, schedule and manage producers and consumers, and hold inner message queues. Executor controls the entire life cycle of data production and consumption.
  • Consumer interface: Consumes data from inner message queue and write to hudi file.
  1. Following the abstract logic in party1, use the lock-free circular queue Disruptor as the inner message queue of the Executor
  • Implement the producer abstraction in party1 and customize the DisruptorBasedProducer which will consume and set data to RingBuffer.
  • Implement the abstraction of Executor. Also control the life cycle through Ringbuffer information and related APIs. Customize HoodieDaemonThreadFactory to control Thread behavior.
  • Implement Consumer abstraction, define DisruptorMessageHandler to control consumption and clear data in Disruptor.
  1. New parameters
  • hoodie.write.executor.type: Set executor which orchestrates concurrent producers and consumers communicating through a message queue.
    Default value is BOUNDED_IN_MEMORY_EXECUTOR which use a bounded in-memory queue using LinkedBlockingQueue.
    Also users could use DISRUPTOR_EXECUTOR, which use disruptor as a lock free message queue to gain better writing performance.
    Although DISRUPTOR_EXECUTOR is still an experimental feature.
  • hoodie.write.buffer.size: The size of the Disruptor Executor ring buffer, must be power of 2
  • hoodie.write.wait.strategy: Strategy employed for making DisruptorExecutor wait on a cursor.
  1. limitation
    For now, this disruptor executor is only supported for spark insert and spark bulk insert operation. Other operations like spark upsert is still on going.

Also use this new disruptor based executor will consume more cpu and memory resources.

Rollout/Adoption Plan

Default executor is BOUNDED_IN_MEMORY_EXECUTOR which use a bounded in-memory queue using LinkedBlockingQueue same as now.
So there is no impact on existing users.

Test Plan

  1. Add UT TestDisruptorMessageQueue and TestDisruptorExecutionInSpark to guard above logic, also validate data correctness.
  2. Add Benchmark BoundInMemoryExecutorBenchmark benchmark with BoundInMemoryExecutor(old) and DisruptorExecutior(new option)

This Patch is tested on dev env, I think it's ready for review.
Although need more java docs here.

Here is the simple schema and spark bulk_insert benchmark locally result between old BoundInMemoryExecutor and new DisruptorExecutor getting 2.0X performance improved :)

Intel(R) Xeon(R) Platinum 8259CL CPU @ 2.50GHz
COW Ingestion:                            Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
BoundInMemory Executor                             5557           5607          70          0.2        5556.9       1.0X
Disruptor Executor                                 2758           2778          28          0.4        2757.7       2.0X

image

I also do several performance tests recently including

Simple schema (several columns) + small amount of data and complex schema (dozens of columns) and large amount of data using bulk_insert.

Simple schema + small amount of data like the bench mark showed, we may get better performance result from 60% to 2X

As for complex schema (dozens of columns) and large amount of data:
1000 partitions, 100,000,000 records, dozens of columns and 20 * r5.12xlarge instants. We can get about 20% performance improved.

Another conclusion is that the more data-producing faster than data consuming, the more benefit we can get from this PR. In this scenario, locking is the performance bottleneck.

For normal scenes, we can get about 20% performance improvement.

Bench Marks in details

data size column size performance tuning
1,000,000 records 7 2X(In Local benchmark)
100,000,000 records 15 23% (In real world)

Please read hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/BoundInMemoryExecutorBenchmark.scala for more details.

Impact

Writer-core module.

Risk level medium

Documentation Update

N/A.

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

UPDATE:

From our experience, we have two kinds of spark-streaming ingestion job.

  1. For aggregated table which has no unique hoodie keys, so that we use insert or bulkinsert+clustering to do this hudi ingestion.
  2. For raw data, use upsert operation.

More details for our performance test :
Hudi version: 0.10.1
Spark Version: 3.0.2 spark streaming
Records number per batch(max): 754932000
Schema: 18 columns

{
  "type": "record",
  "name": "f_order_sa_xxxx_hourly_hudi",
  "namespace": "tv.xxx.schemas",
  "fields": [
    {"name": "timestamp", "type": "long", "doc": "category=timestamp"},
    {"name": "network_id", "type": ["null", "long"], "default": null, "doc": "category=dimension"},
    {"name": "xx_id", "type": ["null", "long"], "default": null, "doc": "category=dimension"},
    {"name": "xx_id", "type": ["null", "long"], "default": null, "doc": "category=dimension"},
    {"name": "xxx_id", "type": ["null", "long"], "default": null, "doc": "category=dimension"},
    {"name": "xxx_id", "type": ["null", "int"], "default": null, "doc": "category=dimension"},
    {"name": "xxx_id", "type": ["null", "int"], "default": null, "doc": "category=dimension"},
    {"name": "xxxx_visibility", "type": ["null", "string"], "default": null, "doc": "category=dimension"},
    {"name": "xxx_owner_id", "type": ["null", "int"], "default": null, "doc": "category=dimension"},
    {"name": "sxxx_endpoint_id", "type": ["null", "int"], "default": null, "doc": "category=dimension"},
    {"name": "xxxx_order_id", "type": ["null", "long"], "default": null, "doc": "category=dimension"},
    {"name": "xxx_source", "type": ["null", "int"], "default": null, "doc": "category=dimension"},
    {"name": "xxx_type", "type": ["null", "long"], "default": null, "doc": "category=dimension"},
    {"name": "content_xxx_id", "type": ["null", "long"], "default": null, "doc": "category=dimension"},
    {"name": "xxx_publisher_id", "type": ["null", "string"], "default": null, "doc": "category=dimension"},
    {"name": "xxx_order_id", "type": ["null", "long"], "default": null, "doc": "category=dimension"},
    {"name": "xxx_ad_views", "type": ["null", "long"], "default": null, "doc": "category=metric"},
    {"name": "revenue", "type": ["null", "double"], "default": null, "doc": "category=metric"}
  ]
}

Insert/bulk_insert performance Benchmark between BIMQ (baseline) and Disruptor with same kafka input, resources and configs.

BIMQ: used 7.9 min to finish writing parquets.
image

Disruptor used 5.6 min to finish writing parquets
image

In terms of Case 1 write performance, Disruptor improved about 29% from 7.9min to 5.6min

Update2

Recently we adopt this disruptor executor into PRD Spark Streaming update pipeline. In CASE2 mentioned before.

As for writing stage performance, we get about 25% performance tuning. From 15s reduced to 11s

More details:
Spark Streaming batch interval: 30s
Records per batch(Avg): 1,361,453
Hudi version: 0.10.1
Spark Version: 3.0.2

BIMQ: used 15s to finish writing parquets.
image

Disruptor used 11s to finish writing parquets
image

@zhangyue19921010 zhangyue19921010 changed the title [HUDI-3963] [RFC-51] Improve Hoodie Writing Efficiency Using Lock-Free Message Queue [HUDI-3963] Improve Hoodie Writing Efficiency Using Lock-Free Message Queue Apr 27, 2022
@zhangyue19921010
Copy link
Contributor Author

Hi @yihua @nsivabalan and @xushiyan Sorry to bother you. Do you have time to help review this PR?
Really really appreciate it. Thanks a lot :)

@yihua yihua self-assigned this Apr 27, 2022
@yihua yihua added priority:critical production down; pipelines stalled; Need help asap. writer-core Issues relating to core transactions/write actions labels Apr 27, 2022
@yihua yihua added this to Under Discussion PRs in PR Tracker Board via automation Apr 27, 2022
@yihua yihua added priority:major degraded perf; unable to move forward; potential bugs and removed priority:critical production down; pipelines stalled; Need help asap. labels Apr 27, 2022
@zhangyue19921010
Copy link
Contributor Author

@hudi-bot run azure

Copy link
Contributor

@alexeykudinkin alexeykudinkin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great effort @zhangyue19921010!

We've really come a long way! Thanks for taking up the clean up the Executor hierarchy, really appreciate it!

@@ -132,6 +134,14 @@ public class HoodieWriteConfig extends HoodieConfig {
.withDocumentation("Key generator class, that implements `org.apache.hudi.keygen.KeyGenerator` "
+ "extract a key out of incoming records.");

public static final ConfigProperty<String> EXECUTOR_TYPE = ConfigProperty
.key("hoodie.write.executor.type")
.defaultValue("BOUNDED_IN_MEMORY")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use it here as well

/**
* Create a new hoodie executor instance on demand.
*/
public static HoodieExecutor create(HoodieWriteConfig hoodieConfig, Iterator inputItr, IteratorBasedQueueConsumer consumer,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make sure we're not dropping generics params

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All changed.


// collect all records and assert that consumed records are identical to produced ones
// assert there's no tampering, and that the ordering is preserved
assertEquals(hoodieRecords.size(), consumedRecords.size());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can actually just do assertEquals(oneList, anotherList)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed!

*/
public ExecutorCompletionService<Boolean> startProducers() {
public List<Future<Boolean>> startProducers() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's rebase all of our usage of Future onto CompletableFuture (which are much more capable and versatile than the former)

For that you just need to change how you trigger the execution doing:

CompletableFuture.runAsync(() -> ..., executorService);

Also, seems like this method is missing @Override

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All changed.

this.consumer = consumer;
this.preExecuteRunnable = preExecuteRunnable;
// Ensure fixed thread for each producer thread
this.producerExecutorService = Executors.newFixedThreadPool(producers.size(), new HoodieDaemonThreadFactory("producer", preExecuteRunnable));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: "executor-queue-producer", "executor-queue-consumer"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed.

/**
* HoodieExecutor which orchestrates concurrent producers and consumers communicating through a bounded in message queue.
*/
public interface HoodieExecutor<I, O, E> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest to reserve this interface for APIs only exposed to the actual user: in that case such only APIs are execute and close. All the other APIs are implementation detils (setup, startConumers, postAction, etc) and shouldn't be exposed outside actual implementations:

  • Let's move all of these to a shared base class
  • Let's keep them "protected"

Let's also make this extend AutoCloseable

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch!
Just keep

E execute();
boolean isRemaining();
void shutdownNow();
boolean awaitTermination();

in public interface HoodieExecutor<I, O, E> extends Closeable which are used to control the lifecycle for hoodieExecutor for example SparkLazyInsertIterable#computeNext

And move all other function into HoodieExecutorBase and changed to protect except startProducers and getQueue which need to be call outside.


package org.apache.hudi.common.util.queue;

public class DisruptorPublisher<I, O> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zhangyue19921010 forgot to remove this class?


@Override
public Option<O> readNextRecord() {
// Let DisruptorMessageHandler to handle consuming logic.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should throw UnsupportedOperationException in here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All changed here.

return ringBuffer.getBufferSize() == ringBuffer.remainingCapacity();
}

public void setHandlers(IteratorBasedQueueConsumer consumer) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be package-private?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It need to be used in TestDisruptorMessageQueue#testCompositeProducerRecordReading :<

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think its a good practice to make methods public just for the purpose of testing. Can we please avoid that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Use java reflection.

public void insertRecord(I value) throws Exception {
O applied = transformFunction.apply(value);

EventTranslator<HoodieDisruptorEvent> translator = new EventTranslator<HoodieDisruptorEvent>() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can simplify this to

EventTranslator<HoodieDisruptorEvent> translator = (event, sequence) -> event.set(applied);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure thing Changed!

@zhangyue19921010
Copy link
Contributor Author

@hudi-bot run azure

@alexeykudinkin
Copy link
Contributor

@zhangyue19921010 please rebase on the latest master. Test failures you observe have already been addressed on master.

@zhangyue19921010
Copy link
Contributor Author

@zhangyue19921010 please rebase on the latest master. Test failures you observe have already been addressed on master.

Hi alexeykudinkin Thanks a lot for reminding. rebase to master and all checks are green
Also comments are addressed. PTAL :)

@alexeykudinkin
Copy link
Contributor

@zhangyue19921010 thank you very much for following up on all of the feedback!

We're waiting on @nsivabalan to give us a green-light from his end, and then we should be good to go

@zhangyue19921010
Copy link
Contributor Author

zhangyue19921010 commented Oct 28, 2022

@zhangyue19921010 thank you very much for following up on all of the feedback!

We're waiting on @nsivabalan to give us a green-light from his end, and then we should be good to go

Really appreciate it for your efforts here! @alexeykudinkin and @nsivabalan

@nsivabalan
Copy link
Contributor

will review by tomorrow.

Copy link
Contributor

@nsivabalan nsivabalan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mostly minor comments. once addressed, we are good to land.

public static final ConfigProperty<String> WRITE_WAIT_STRATEGY = ConfigProperty
.key("hoodie.write.executor.disruptor.wait.strategy")
.defaultValue("BLOCKING_WAIT")
.withDocumentation("Strategy employed for making Disruptor Executor wait on a cursor.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add other possible values in the documentation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure thing. Added.

return Option.of(getString(WRITE_WAIT_STRATEGY));
}

public Option<Integer> getWriteBufferSize() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is confusing w/ already existing one (WRITE_BUFFER_LIMIT_BYTES_VALUE). Can we add units here. initially I thought this is also in bytes. but looks like its ring buffer size. We can also add "ring buffer" or "disruptor" to the name since this is applicable only to disruptor based queue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, units to bytes and changed name to getDisruptorWriteBufferSize()


HoodieWriteConfig hoodieWriteConfig = mock(HoodieWriteConfig.class);
when(hoodieWriteConfig.getWriteBufferSize()).thenReturn(Option.of(16));
IteratorBasedQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer> consumer =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not required in this patch. But do you think we can have one set of tests and parametrize two diff queues. looking to reuse code. Bcoz, the way we test both bounded in memory queue and disruptor based one are same. just the initialization differs. but essentially its a functional testing of the queue.
let me know wdyt.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice Idea actually. We should keep the code as concise as possible.
https://issues.apache.org/jira/browse/HUDI-5106
Will working on it after this patch landed :)


private static final long TERMINATE_WAITING_TIME_SECS = 60L;
// Executor service used for launching write thread.
public final ExecutorService producerExecutorService;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we try to use protected wherever applicable. do we really need public for all these instance variables ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure thing. Just go through all the PR and changed wherever applicable.

try {
preExecuteRunnable.run();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

previously preExecuteRunnable was executed both in producer as well as consumers. W/ this patch, I see we are executing it only for consumers? is that intentional ?
also, I only see it getting executed in BoundedInMemory and not in Distruptor based impl. can you check on that please.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, we need to do preExecuteRunnable in Producer/Consumer thread of BoundedInMemory/DisruptorQueue

The original implementation is indeed a bit confusing, and it is simplified here.

expand the CustomizedThreadFactory and will do preExecuteRunnable in each thread if it wasn't null.

Also use this CustomizedThreadFactory to build producerExecutorService and consumerExecutorService

So that we don't need to call preExecuteRunnable manually.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gotcha.

@Override
protected void postAction() {
try {
queue.close();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

incase of shutdownNow, we shutdown the executorservices and then do queue. close.
where as here, we close queue first followed by shutting down executor services.
may I know whats the right approach. can we fix align.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch. changed.
We align to shutdown the executor services and then do queue.close()

return ringBuffer.getBufferSize() == ringBuffer.remainingCapacity();
}

public void setHandlers(IteratorBasedQueueConsumer consumer) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think its a good practice to make methods public just for the purpose of testing. Can we please avoid that.

DISRUPTOR;

public static List<String> getNames() {
List<String> names = new ArrayList<>(KeyGeneratorType.values().length);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't this ExecutorType.values() ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, changed!

/**
* Shutdown all the consumers and producers.
*/
public void shutdownNow() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

again, lets make protected or package private whereever applicbale.
and lets not make a method public just for testing purpose.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All changed. Just go through all the codes in this PR.
If there were anything missed, please let me know. Thanks!

/**
* Get the size of inner message queue.
*/
long size();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor. does this refer to total size, or pending size. not apparent from the name.

Copy link
Contributor Author

@zhangyue19921010 zhangyue19921010 Oct 31, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed the java docs. it refers to Returns the number of elements in this queue.
Really appreciate for your reviewing here :)

@hudi-bot
Copy link

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@zhangyue19921010
Copy link
Contributor Author

hi @nsivabalan Thanks a lot for your review. All comments are addressed. please take a look :)

@zhangyue19921010
Copy link
Contributor Author

Also all the Azure jobs are passed now :)

try {
preExecuteRunnable.run();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gotcha.

@nsivabalan
Copy link
Contributor

Great job buddy for patiently addressing all comments !

@nsivabalan nsivabalan merged commit a5434b6 into apache:master Nov 3, 2022
PR Tracker Board automation moved this from Nearing Landing to Done Nov 3, 2022
@zhangyue19921010
Copy link
Contributor Author

Great job buddy for patiently addressing all comments !

Thanks a lot for your help! @alexeykudinkin and @nsivabalan

alexeykudinkin pushed a commit that referenced this pull request Dec 15, 2022
… inner message queue (#7174)

Followed PR #5416

Add a new Executor Type named SIMPLE for hoodie records writer.

This Simple executor is Single Writer and Single Reader mode. Also this SimpleHoodieExecutor has no inner message queue and no inner lock which can consume and writing records from iterator directly.

Advantages: There is no need for additional memory and cpu resources due to lock or multithreading.

Disadvantages: lost some benefits such as speed limit. And can not de-coupe the network read (shuffle read) and network write (writing objects/files to storage) anymore which may cause a little lower throughput compared with DisruptorExecutor.

Also I did a quick benchmark using hoodie benchmark framework org.apache.spark.sql.execution.benchmark.BoundInMemoryExecutorBenchmark

Minimize the impact of producers and consumers efficiency issue as much as possible to focus on testing the throughput limit of the inner queue

The result are

OpenJDK 64-Bit Server VM 1.8.0_342-b07 on Linux 5.10.62-55.141.amzn2.x86_64
Intel(R) Xeon(R) Platinum 8259CL CPU @ 2.50GHz
COW Ingestion:                            Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
BoundInMemory Executor                            34661          35143         292          0.3        3466.1       1.0X
Simple Executor                                   17347          17796         681          0.6        1734.7       2.0X
Disruptor Executor                                15803          16535         936          0.6        1580.3       2.2X
this Simple Executor has good throughput and minimal resource usage.

Also add the corresponding UTs
fengjian428 pushed a commit to fengjian428/hudi that referenced this pull request Apr 5, 2023
…iting Efficiency (apache#5416)

https://issues.apache.org/jira/browse/HUDI-3963
RFC design : apache#5567

Add Lock-Free executor to improve hoodie writing throughput and optimize execution efficiency.
Disruptor linked: https://lmax-exchange.github.io/disruptor/user-guide/index.html#_introduction. Existing BoundedInMemory is the default. Users can enable on a need basis. 

Co-authored-by: yuezhang <yuezhang@freewheel.tv>
fengjian428 pushed a commit to fengjian428/hudi that referenced this pull request Apr 5, 2023
… inner message queue (apache#7174)

Followed PR apache#5416

Add a new Executor Type named SIMPLE for hoodie records writer.

This Simple executor is Single Writer and Single Reader mode. Also this SimpleHoodieExecutor has no inner message queue and no inner lock which can consume and writing records from iterator directly.

Advantages: There is no need for additional memory and cpu resources due to lock or multithreading.

Disadvantages: lost some benefits such as speed limit. And can not de-coupe the network read (shuffle read) and network write (writing objects/files to storage) anymore which may cause a little lower throughput compared with DisruptorExecutor.

Also I did a quick benchmark using hoodie benchmark framework org.apache.spark.sql.execution.benchmark.BoundInMemoryExecutorBenchmark

Minimize the impact of producers and consumers efficiency issue as much as possible to focus on testing the throughput limit of the inner queue

The result are

OpenJDK 64-Bit Server VM 1.8.0_342-b07 on Linux 5.10.62-55.141.amzn2.x86_64
Intel(R) Xeon(R) Platinum 8259CL CPU @ 2.50GHz
COW Ingestion:                            Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
BoundInMemory Executor                            34661          35143         292          0.3        3466.1       1.0X
Simple Executor                                   17347          17796         681          0.6        1734.7       2.0X
Disruptor Executor                                15803          16535         936          0.6        1580.3       2.2X
this Simple Executor has good throughput and minimal resource usage.

Also add the corresponding UTs
vinishjail97 pushed a commit to vinishjail97/hudi that referenced this pull request Dec 15, 2023
* [HUDI-4282] Repair IOException in CHDFS when check block corrupted in HoodieLogFileReader (apache#6031)

Co-authored-by: Y Ethan Guo <ethan.guoyihua@gmail.com>

* [HUDI-4757] Create pyspark examples (apache#6672)

* [HUDI-3959] Rename class name for spark rdd reader (apache#5409)

Co-authored-by: Y Ethan Guo <ethan.guoyihua@gmail.com>

* [HUDI-4828] Fix the extraction of record keys which may be cut out (apache#6650)

Co-authored-by: yangshuo3 <yangshuo3@kingsoft.com>
Co-authored-by: Y Ethan Guo <ethan.guoyihua@gmail.com>

* [HUDI-4873] Report number of messages to be processed via metrics (apache#6271)

Co-authored-by: Volodymyr Burenin <volodymyr.burenin@cloudkitchens.com>
Co-authored-by: Y Ethan Guo <ethan.guoyihua@gmail.com>

* [HUDI-4870] Improve compaction config description (apache#6706)

* [HUDI-3304] Support partial update payload (apache#4676)


Co-authored-by: jian.feng <jian.feng@shopee.com>

* [HUDI-4808] Fix HoodieSimpleBucketIndex not consider bucket num in lo… (apache#6630)

* [HUDI-4808] Fix HoodieSimpleBucketIndex not consider bucket num in log file issue

Co-authored-by: xiaoxingstack <xiaoxingstack@didiglobal.com>

* [HUDI-4485] Bump spring shell to 2.1.1 in CLI (apache#6489)

Bumped spring shell to 2.1.1 and updated the default 
value for show fsview all `pathRegex` parameter.

* [minor] following 3304, some code refactoring (apache#6713)

* [HUDI-4832] Fix drop partition meta sync (apache#6662)

* [HUDI-4810] Fix log4j imports to use bridge API  (apache#6710)


Co-authored-by: dongsj <dongsj@asiainfo.com>

* [HUDI-4877] Fix org.apache.hudi.index.bucket.TestHoodieSimpleBucketIndex#testTagLocation not work correct issue (apache#6717)


Co-authored-by: xiaoxingstack <xiaoxingstack@didiglobal.com>

* [HUDI-4326] add updateTableSerDeInfo for HiveSyncTool (apache#5920)

- This pull request fix [SUPPORT] Hudi spark datasource error after migrate from 0.8 to 0.11 apache#5861*
- The issue is caused by after changing the table to spark data source table, the table SerDeInfo is missing. *

Co-authored-by: Sagar Sumit <sagarsumit09@gmail.com>

* [MINOR] fix indent to make build pass (apache#6721)

* [HUDI-3478] Implement CDC Write in Spark (apache#6697)

* [HUDI-4326] Fix hive sync serde properties (apache#6722)

* [HUDI-4875] Fix NoSuchTableException when dropping temporary view after applied HoodieSparkSessionExtension in Spark 3.2 (apache#6709)

* [DOCS] Improve the quick start guide for Kafka Connect Sink (apache#6708)

* [HUDI-4729] Fix file group pending compaction cannot be queried when query _ro table (apache#6516)

File group in pending compaction can not be queried 
when query _ro table with spark. This commit fixes that.

Co-authored-by: zhanshaoxiong <shaoxiong0001@@gmail.com>
Co-authored-by: Sagar Sumit <sagarsumit09@gmail.com>

* [HUDI-3983] Fix ClassNotFoundException when using hudi-spark-bundle to write table with hbase index (apache#6715)

* [HUDI-4758] Add validations to java spark examples (apache#6615)

* [HUDI-4792] Batch clean files to delete (apache#6580)

This  patch makes use of batch call to get fileGroup to delete during cleaning instead of 1 call per partition.
This limit the number of call to the view and should fix the trouble with metadata table in context of lot of partitions.
Fixes issue apache#6373

Co-authored-by: sivabalan <n.siva.b@gmail.com>

* [HUDI-4363] Support Clustering row writer to improve performance (apache#6046)

* [HUDI-3478][HUDI-4887] Use Avro as the format of persisted cdc data (apache#6734)

* [HUDI-4851] Fixing handling of `UTF8String` w/in `InSet` operator (apache#6739)


Co-authored-by: Raymond Xu <2701446+xushiyan@users.noreply.github.com>

* [HUDI-3901] Correct the description of hoodie.index.type (apache#6749)

* [MINOR] Add .mvn directory to gitignore (apache#6746)

Co-authored-by: Rahil Chertara <rchertar@amazon.com>

* add support for unraveling proto schemas

* fix some compile issues

* [HUDI-4901] Add avro.version to Flink profiles (apache#6757)

* Add avro.version to Flink profiles

Co-authored-by: Shawn Chang <yxchang@amazon.com>

* [HUDI-4559] Support hiveSync command based on Call Produce Command (apache#6322)

* [HUDI-4883] Supporting delete savepoint for MOR (apache#6744)

Users could delete unnecessary savepoints 
and unblock archival for MOR table.

* [HUDI-4897] Refactor the merge handle in CDC mode (apache#6740)

* [HUDI-3523] Introduce AddColumnSchemaPostProcessor to support add columns to the end of a schema (apache#5031)

* Revert "[HUDI-3523] Introduce AddColumnSchemaPostProcessor to support add columns to the end of a schema (apache#5031)" (apache#6768)

This reverts commit 092375f.

* [HUDI-3523] Introduce AddPrimitiveColumnSchemaPostProcessor to support add new primitive column to the end of a schema (apache#6769)

* [HUDI-4903] Fix TestHoodieLogFormat`s minor typo (apache#6762)

* [MINOR] Drastically reducing concurrency level (to avoid CI flakiness) (apache#6754)

* Update HoodieIndex.java

Fix a typo

* [HUDI-4906] Fix the local tests for hudi-flink (apache#6763)

* [HUDI-4899] Fixing compatibility w/ Spark 3.2.2 (apache#6755)

* [HUDI-4892] Fix hudi-spark3-bundle (apache#6735)

* [MINOR] Fix a few typos in HoodieIndex (apache#6784)

Co-authored-by: xingjunwang <xingjunwang@tencent.com>

* [HUDI-4412] Fix multi writer INSERT_OVERWRITE NPE bug (apache#6130)

There are two minor issues fixed here:

1. When the insert_overwrite operation is performed, the 
    clusteringPlan in the requestedReplaceMetadata will be 
    null. Calling getFileIdsFromRequestedReplaceMetadata will cause NPE.

2. When insert_overwrite operation, inflightCommitMetadata!=null, 
    getOperationType should be obtained from getHoodieInflightReplaceMetadata,
    the original code will have a null pointer.

* [MINOR] retain avro's namespace (apache#6783)

* [MINOR] Simple logging fix in LockManager (apache#6765)

Co-authored-by: 苏承祥 <sucx@tuya.com>

* [HUDI-4433] hudi-cli repair deduplicate not working with non-partitioned dataset (apache#6349)

When using the repair deduplicate command with hudi-cli, 
there is no way to run it on the unpartitioned dataset, 
so modify the cli parameter.

Co-authored-by: Xingjun Wang <wongxingjun@126.com>

* [RFC-51][HUDI-3478] Update RFC: CDC support (apache#6256)

* [HUDI-4915] improve avro serializer/deserializer (apache#6788)

* [HUDI-3478] Implement CDC Read in Spark (apache#6727)

* naming and style updates

* [HUDI-4830] Fix testNoGlobalConfFileConfigured when add hudi-defaults.conf in default dir (apache#6652)

* make test data random, reuse code

* [HUDI-4760] Fixing repeated trigger of data file creations w/ clustering (apache#6561)

- Apparently in clustering, data file creations are triggered twice since we don't cache the write status and for doing some validation, we do isEmpty on JavaRDD which ended up retriggering the action. Fixing the double de-referencing in this patch.

* [HUDI-4914] Managed memory weight should be set when sort clustering is enabled (apache#6792)

* [HUDI-4910] Fix unknown variable or type "Cast" (apache#6778)

* [HUDI-4918] Fix bugs about when trying to show the non -existing key from env, NullPointException occurs. (apache#6794)

* [HUDI-4718] Add Kerberos kinit command support. (apache#6719)

* add test for 2 different recursion depths, fix schema cache key

* add unsigned long support

* better handle other types

* rebase on 4904

* get all tests working

* fix oneof expected schema, update tests after rebase

* [HUDI-4902] Set default partitioner for SIMPLE BUCKET index (apache#6759)

* [MINOR] Update PR template with documentation update (apache#6748)

* revert scala binary change

* try a different method to avoid avro version

* [HUDI-4904] Add support for unraveling proto schemas in ProtoClassBasedSchemaProvider (apache#6761)

If a user provides a recursive proto schema, it will fail when we write to parquet. We need to allow the user to specify how many levels of recursion they want before truncating the remaining data.

Main changes to existing code:

ProtoClassBasedSchemaProvider tracks number of times a message descriptor is seen within a branch of the schema traversal
once the number of times that descriptor is seen exceeds the user provided limit, set the field to preset record that will contain two fields: 1) the remaining data serialized as a proto byte array, 2) the descriptors full name for context about what is in that byte array
Converting from a proto to an avro now accounts for this truncation of the input

* delete unused file

* [HUDI-4907] Prevent single commit multi instant issue (apache#6766)


Co-authored-by: TengHuo <teng_huo@outlook.com>
Co-authored-by: yuzhao.cyz <yuzhao.cyz@gmail.com>

* [HUDI-4923] Fix flaky TestHoodieReadClient.testReadFilterExistAfterBulkInsertPrepped (apache#6801)



Co-authored-by: Raymond Xu <2701446+xushiyan@users.noreply.github.com>

* [HUDI-4848] Fixing repair deprecated partition tool (apache#6731)

* [HUDI-4913] Fix HoodieSnapshotExporter for writing to a different S3 bucket or FS (apache#6785)

* address PR feedback, update decimal precision

* fix isNullable issue, check if class is Int64value

* checkstyle fix

* change wrapper descriptor set initialization

* add in testing for unsigned long to BigInteger conversion

* [HUDI-4453] Fix schema to include partition columns in bootstrap operation (apache#6676)

Turn off the type inference of the partition column to be consistent with 
existing behavior. Add notes around partition column type inference.

* [HUDI-2780] Fix the issue of Mor log skipping complete blocks when reading data (apache#4015)


Co-authored-by: huangjing02 <huangjing02@bilibili.com>
Co-authored-by: sivabalan <n.siva.b@gmail.com>

* [HUDI-4924] Auto-tune dedup parallelism (apache#6802)

* [HUDI-4687] Avoid setAccessible which breaks strong encapsulation (apache#6657)

Use JOL GraphLayout for estimating deep size.

* [MINOR] fixing validate async operations to poll completed clean instances (apache#6814)

* [HUDI-4734] Deltastreamer table config change validation (apache#6753)


Co-authored-by: sivabalan <n.siva.b@gmail.com>

* [HUDI-4934] Revert batch clean files (apache#6813)

* Revert "[HUDI-4792] Batch clean files to delete (apache#6580)"
This reverts commit cbf9b83.

* [HUDI-4722] Added locking metrics for Hudi (apache#6502)

* [HUDI-4936] Fix `as.of.instant` not recognized as hoodie config (apache#5616)


Co-authored-by: leon <leon@leondeMacBook-Pro.local>
Co-authored-by: Raymond Xu <2701446+xushiyan@users.noreply.github.com>

* [HUDI-4861] Relaxing `MERGE INTO` constraints to permit limited casting operations w/in matched-on conditions (apache#6820)

* [HUDI-4885] Adding org.apache.avro to hudi-hive-sync bundle (apache#6729)

* [HUDI-4951] Fix incorrect use of Long.getLong() (apache#6828)

* [MINOR] Use base path URI in ITTestDataStreamWrite (apache#6826)

* [HUDI-4308] READ_OPTIMIZED read mode will temporary loss of data when compaction (apache#6664)

Co-authored-by: Y Ethan Guo <ethan.guoyihua@gmail.com>

* [HUDI-4237] Fixing empty partition-values being sync'd to HMS (apache#6821)

Co-authored-by: dujunling <dujunling@bytedance.com>
Co-authored-by: Raymond Xu <2701446+xushiyan@users.noreply.github.com>

* [HUDI-4925] Should Force to use ExpressionPayload in MergeIntoTableCommand (apache#6355)


Co-authored-by: jian.feng <jian.feng@shopee.com>

* [HUDI-4850] Add incremental source from GCS to Hudi (apache#6665)

Adds an incremental source from GCS based on a similar design 
as https://hudi.apache.org/blog/2021/08/23/s3-events-source

* [HUDI-4957] Shade JOL in bundles to fix NoClassDefFoundError:GraphLayout (apache#6839)

* [HUDI-4718] Add Kerberos kdestroy command support (apache#6810)

* [HUDI-4916] Implement change log feed for Flink (apache#6840)

* [HUDI-4769] Option read.streaming.skip_compaction skips delta commit (apache#6848)

* [HUDI-4949] optimize cdc read to avoid the problem of reusing buffer underlying the Row (apache#6805)

* [HUDI-4966] Add a partition extractor to handle partition values with slashes (apache#6851)

* [MINOR] Fix testUpdateRejectForClustering (apache#6852)

* [HUDI-4962] Move cloud dependencies to cloud modules (apache#6846)

* [HOTFIX] Fix source release validate script (apache#6865)

* [HUDI-4980] Calculate avg record size using commit only (apache#6864)

Calculate average record size for Spark upsert partitioner 
based on commit instants only. Previously it's based on 
commit and replacecommit, of which the latter may be 
created by clustering which has inaccurately smaller 
average record sizes, which could result in OOM 
due to size underestimation.

* shade protobuf dependency

* Revert "[HUDI-4915] improve avro serializer/deserializer (apache#6788)" (apache#6809)

This reverts commit 79b3e2b.

* [HUDI-4970] Update kafka-connect readme and refactor HoodieConfig#create (apache#6857)

* Enhancing README for multi-writer tests (apache#6870)

* [MINOR] Fix deploy script for flink 1.15 (apache#6872)

* [HUDI-4992] Fixing invalid min/max record key stats in Parquet metadata (apache#6883)

* Revert "shade protobuf dependency"

This reverts commit f03f961.

* [HUDI-4972] Fixes to make unit tests work on m1 mac (apache#6751)

* [HUDI-2786] Docker demo on mac aarch64 (apache#6859)

* [HUDI-4971] Fix shading kryo-shaded with reusing configs (apache#6873)

* [HUDI-3900] [UBER] Support log compaction action for MOR tables (apache#5958)

- Adding log compaction support to MOR table. subsequent log blocks can now be compacted into larger log blocks without needing to go for full compaction (by merging w/ base file). 
- New timeline action is introduced for the purpose. 

Co-authored-by: sivabalan <n.siva.b@gmail.com>

* Relocate apache http package (apache#6874)

* [HUDI-4975] Fix datahub bundle dependency (apache#6896)

* [HUDI-4999] Refactor FlinkOptions#allOptions and CatalogOptions#allOptions (apache#6901)

* [MINOR] Update GitHub setting for merge button (apache#6922)

Only allow squash and merge. Disable merge and rebase

* [HUDI-4993] Make DataPlatform name and Dataset env configurable in DatahubSyncTool (apache#6885)

* [MINOR] Fix name spelling for RunBootstrapProcedure

* [HUDI-4754] Add compliance check in github actions (apache#6575)

* [HUDI-4963] Extend InProcessLockProvider to support multiple table ingestion (apache#6847)


Co-authored-by: rmahindra123 <rmahindra@Rajeshs-MacBook-Pro.local>

* [HUDI-4994] Fix bug that prevents re-ingestion of soft-deleted Datahub entities (apache#6886)

* Implement Create/Drop/Show/Refresh Secondary Index (apache#5933)

* [MINOR] Moved readme from  .github to the workflows folder (apache#6932)

* [HUDI-4952] Fixing reading from metadata table when there are no inflight commits (apache#6836)

* Fixing reading from metadata table when there are no inflight commits
* Fixing reading from metadata if not fully built out
* addressing minor comments
* fixing sql conf and options interplay
* addressing minor refactoring

* [HUDI-1575][RFC-56] Early Conflict Detection For Multi-writer (apache#6003)

Co-authored-by: yuezhang <yuezhang@yuezhang-mac.freewheelmedia.net>
Co-authored-by: yuezhang <yuezhang@freewheel.tv>
Co-authored-by: Y Ethan Guo <ethan.guoyihua@gmail.com>

* [HUDI-5006] Use the same wrapper for timestamp type metadata for parquet and log files (apache#6918)

Before this patch, for timestamp type, we use LongWrapper for parquet and TimestampMicrosWrapper for avro log,
they may keep different precision val here, for example, with timestamp(3), LongWrapper keeps the val as a millisecond long from EPOCH instant,
while TimestampMicrosWrapper keeps the val as micro-seconds.

For spark, it uses micro-seconds internally for timestamp type value, while flink uses the TimestampData internally,
we better keeps the same precision for better compatibility here.

* [HUDI-5016] Flink clustering does not reserve commit metadata (apache#6929)

* [HUDI-3900] Fixing hdfs setup and tear down in tests to avoid flakiness (apache#6912)

* [HUDI-5002] Remove deprecated API usage in SparkHoodieHBaseIndex#generateStatement (apache#6909)

Co-authored-by: slfan1989 <louj1988@@>

* [HUDI-5010] Fix flink hive catalog external config not work (apache#6923)

* fix flink catalog external config not work

* [HUDI-4948] Improve CDC Write (apache#6818)

* improve cdc write to support multiple log files
* update: use map to store the cdc stats

* [HUDI-5030] Fix TestPartialUpdateAvroPayload.testUseLatestRecordMetaValue(apache#6948)

* [HUDI-5033] Fix Broken Link In MultipleSparkJobExecutionStrategy (apache#6951)

Co-authored-by: slfan1989 <louj1988@@>

* [HUDI-5037] Upgrade org.apache.thrift:libthrift to 0.14.0 (apache#6941)

* [MINOR] Fixing verbosity of docker set up (apache#6944)

* [HUDI-5022] Make better error messages for pr compliance (apache#6934)

* [HUDI-5003] Fix the type of InLineFileSystem`startOffset to long (apache#6916)

* [HUDI-4855] Add missing table configs for bootstrap in Deltastreamer (apache#6694)

* [MINOR] Handling null event time (apache#6876)

* [MINOR] Update DOAP with 0.12.1 Release (apache#6988)

* [MINOR] Increase maxParameters size in scalastyle (apache#6987)

* [HUDI-3900] Closing resources in TestHoodieLogRecord (apache#6995)

* [MINOR] Test case for hoodie.merge.allow.duplicate.on.inserts (apache#6949)

* [HUDI-4982] Add validation job for spark bundles in GitHub Actions (apache#6954)

* [HUDI-5041] Fix lock metric register confict error (apache#6968)

Co-authored-by: hbg <bingeng.huang@shopee.com>

* [HUDI-4998] Infer partition extractor class first from meta sync partition fields (apache#6899)

* [HUDI-4781] Allow omit metadata fields for hive sync (apache#6471)


Co-authored-by: Raymond Xu <2701446+xushiyan@users.noreply.github.com>

* [HUDI-4997] Use jackson-v2 import instead of jackson-v1 (apache#6893)



Co-authored-by: slfan1989 <louj1988@@>

* [HUDI-3900] Fixing tempDir usage in TestHoodieLogFormat (apache#6981)

* [HUDI-4995] Relocate httpcomponents (apache#6906)

* [MINOR] Update GitHub setting for branch protection (apache#7008)

- require at least 1 approving review

* [HUDI-4960] Upgrade jetty version for timeline server (apache#6844)

Co-authored-by: rmahindra123 <rmahindra@Rajeshs-MacBook-Pro.local>
Co-authored-by: Y Ethan Guo <ethan.guoyihua@gmail.com>

* [HUDI-5046] Support all the hive sync options for flink sql (apache#6985)

* [MINOR] fix cdc flake ut (apache#7016)

* [MINOR] Remove redundant space in PR compliance check (apache#7022)

* [HUDI-5063] Enabling run time stats to be serialized with commit metadata (apache#7006)

* [HUDI-5070] Adding lock provider to testCleaner tests since async cleaning is invoked (apache#7023)

* [HUDI-5070] Move flaky cleaner tests to separate class (apache#7034)

* [HUDI-4971] Remove direct use of kryo from `SerDeUtils` (apache#7014)



Co-authored-by: Alexey Kudinkin <alexey@infinilake.com>

* [HUDI-5081] Tests clean up in hudi-utilities (apache#7033)

* [HUDI-5027] Replace hardcoded hbase config keys with constant variables  (apache#6946)

* [MINOR] add commit_action output in show_commits (apache#7012)

Co-authored-by: 苏承祥 <sucx@tuya.com>

* [HUDI-5061] bulk insert operation don't throw other exception except IOE Exception (apache#7001)

Co-authored-by: liufangqi.chenfeng <liufangqi.chenfeng@BYTEDANCE.COM>

* [MINOR] Skip loading last completed txn for single writer (apache#6660)


Co-authored-by: sivabalan <n.siva.b@gmail.com>

* [HUDI-4281] Using hudi to build a large number of tables in spark on hive causes OOM (apache#5903)

* [HUDI-5042] Fix clustering schedule problem in flink when enable schedule clustering and disable async clustering (apache#6976)

Co-authored-by: hbg <bingeng.huang@shopee.com>

* [HUDI-4753] more accurate record size estimation for log writing and spillable map (apache#6632)

* [HUDI-4201] Cli tool to get warned about empty non-completed instants from timeline (apache#6867)

* [HUDI-5038] Increase default num_instants to fetch for incremental source (apache#6955)

* [HUDI-5049] Supports dropPartition for Flink catalog (apache#6991)

* for both dfs and hms catalogs

* [HUDI-4809] glue support drop partitions (apache#7007)

Co-authored-by: xxhua <xxhua@freewheel.tv>

* [HUDI-5057] Fix msck repair hudi table (apache#6999)

* [HUDI-4959] Fixing Avro's `Utf8` serialization in Kryo (apache#7024)

* temp_view_support (apache#6990)

Co-authored-by: 苏承祥 <sucx@tuya.com>

* [HUDI-4982] Add Utilities and Utilities Slim + Spark Bundle testing to GH Actions (apache#7005)


Co-authored-by: Raymond Xu <2701446+xushiyan@users.noreply.github.com>

* [HUDI-5085]When a flink job has multiple sink tables, the index loading status is abnormal (apache#7051)

* [HUDI-5089] Refactor HoodieCommitMetadata deserialization (apache#7055)

* [HUDI-5058] Fix flink catalog read spark table error : primary key col can not be nullable (apache#7009)

* [HUDI-5087] Fix incorrect merging sequence for Column Stats Record in `HoodieMetadataPayload` (apache#7053)

* [HUDI-5087]Fix incorrect maxValue getting from metatable

[HUDI-5087]Fix incorrect maxValue getting from metatable

* Fixed `HoodieMetadataPayload` merging seq;
Added test

* Fixing handling of deletes;
Added tests for handling deletes;

* Added tests for combining partition files-list record

Co-authored-by: Alexey Kudinkin <alexey@infinilake.com>

* [HUDI-4946] fix merge into with no preCombineField having dup row by only insert (apache#6824)

* [HUDI-5072] Extract `ExecutionStrategy#transform` duplicate code (apache#7030)

* [HUDI-3287] Remove hudi-spark dependencies from hudi-kafka-connect-bundle (apache#6079)

* [HUDI-5000] Support schema evolution for Hive/presto (apache#6989)

Co-authored-by: z00484332 <zhaolong36@huawei.com>

* [HUDI-4716] Avoid parquet-hadoop-bundle in hudi-hadoop-mr (apache#6930)

* [HUDI-5035] Remove usage of deprecated HoodieTimer constructor (apache#6952)

Co-authored-by: slfan1989 <louj1988@@>
Co-authored-by: Y Ethan Guo <ethan.guoyihua@gmail.com>

* [HUDI-5083]Fixed a bug when schema evolution (apache#7045)

* [HUDI-5102] source operator(monitor and reader) support user uid  (apache#7085)

* Update HoodieTableSource.java

Co-authored-by: chenzhiming <chenzhm@chinatelecom.cn>

* [HUDI-5057] Fix msck repair external hudi table (apache#7084)

* [MINOR] Fix typos in Spark client related classes (apache#7083)

* [HUDI-4741] hotfix to avoid partial failover cause restored subtask timeout (apache#6796)

Co-authored-by: jian.feng <jian.feng@shopee.com>

* [MINOR] use default maven version since it already fix the warnings recently (apache#6863)

Co-authored-by: jian.feng <jian.feng@shopee.com>

* Revert "[HUDI-4741] hotfix to avoid partial failover cause restored subtask timeout (apache#6796)" (apache#7090)

This reverts commit e222693.

* [MINOR] Fix doc of org.apache.hudi.sink.meta.CkpMetadata#bootstrap (apache#7048)

Co-authored-by: xiaoxingstack <xiaoxingstack@didiglobal.com>

* [HUDI-4799] improve analyzer exception tip when cannot resolve expression (apache#6625)

* [HUDI-5096] Upgrade jcommander to 1.78 (apache#7068)

- resolves security vulnerability
- resolves NPE issues with HiveSyncTool args parsing

Co-authored-by: Raymond Xu <2701446+xushiyan@users.noreply.github.com>

* [HUDI-5105] Add Call show_commit_extra_metadata for spark sql (apache#7091)

* [HUDI-5105] Add Call show_commit_extra_metadata for spark sql

* [HUDI-5107] Fix hadoop config in DirectWriteMarkers, HoodieFlinkEngineContext and StreamerUtil are not consistent issue (apache#7094)

Co-authored-by: xiaoxingstack <xiaoxingstack@didiglobal.com>

* [MINOR] Fix OverwriteWithLatestAvroPayload full class name (apache#7096)

* [HUDI-5074] Warn if table for metastore sync has capitals in it (apache#7077)


Co-authored-by: Jonathan Vexler <=>

* [HUDI-5124] Fix HoodieInternalRowFileWriter#canWrite error return tag. (apache#7107)

Co-authored-by: slfan1989 <louj1988@@>

* [MINOR] update commons-codec:commons-codec 1.4 to 1.13 (apache#6959)

* [HUDI-5148] Claim RFC-63 for Index on Function and Logical Partitioning (apache#7114)

* [HUDI-5065] Call close on SparkRDDWriteClient in HoodieCleaner (apache#7101)



Co-authored-by: Jonathan Vexler <=>

* [HUDI-4624] Implement Closable for S3EventsSource (apache#7086)


Co-authored-by: Jonathan Vexler <=>

* [HUDI-5045] Adding support to configure index type with integ tests (apache#6982)

Co-authored-by: Y Ethan Guo <ethan.guoyihua@gmail.com>

* [HUDI-3963] Use Lock-Free Message Queue Disruptor Improving Hoodie Writing Efficiency  (apache#5416)

https://issues.apache.org/jira/browse/HUDI-3963
RFC design : apache#5567

Add Lock-Free executor to improve hoodie writing throughput and optimize execution efficiency.
Disruptor linked: https://lmax-exchange.github.io/disruptor/user-guide/index.html#_introduction. Existing BoundedInMemory is the default. Users can enable on a need basis. 

Co-authored-by: yuezhang <yuezhang@freewheel.tv>

* [HUDI-5076] Fixing non serializable path used in engineContext with metadata table intialization (apache#7036)

* [HUDI-5032] Add archive to cli (apache#7076)

Adding archiving capability to cli.

Co-authored-by: Jonathan Vexler <=>

* [HUDI-4880] Fix corrupted parquet file issue left over by cancelled compaction task (apache#6733)

* [HUDI-5147] Flink data skipping doesn't work when HepPlanner calls copy()… (apache#7113)

* [HUDI-5147] Flink data skipping doesn't work when HepPlanner calls copy() on HoodieTableSource

* [MINOR] Fixing broken test (apache#7123)

* [HUDI-4898] presto/hive respect payload during merge parquet file and logfile when reading mor table (apache#6741)

* [HUDI-4898] presto/hive respect payload during merge parquet file and logfile when reading mor table

* Update HiveAvroSerializer.java otherwise payload string type combine field will cause cast exception

* [HUDI-5126] Delete duplicate configuration items PAYLOAD_CLASS_NAME (apache#7103)

* [HUDI-4989] Fixing deltastreamer init failures (apache#6862)

Fixing handling missing hoodie.properties

* [MINOR] Fix flaky test in ITTestHoodieDataSource (apache#7134)

* [HUDI-4071] Remove default value for mandatory record key field (apache#6681)

* [HUDI-5088]Fix bug:Failed to synchronize the hive metadata of the Flink table (apache#7056)

* sync `_hoodie_operation` meta field if changelog mode is enabled.

* [MINOR] Removing spark2 scala12 combinations from readme (apache#7112)

* [HUDI-5153] Fix the write token name resolution of cdc log file (apache#7128)

* [HUDI-5066] Support flink hoodie source metaclient cache (apache#7017)

* [HUDI-5132] Add hadoop-mr bundle validation (apache#7157)

* [HUDI-2673] Add kafka connect bundle to validation test (apache#7131)

* [HUDI-5082] Improve the cdc log file name format (apache#7042)

* [HUDI-5154] Improve hudi-spark-client Lambada writing (apache#7127)

Co-authored-by: slfan1989 <louj1988@@>

* [HUDI-5178] Add Call show_table_properties for spark sql (apache#7161)

* [HUDI-5067] Merge the columns stats of multiple log blocks from the same log file (apache#7018)

* [HUDI-5025] Rollback failed with log file not found when rollOver in rollback process (apache#6939)

* fix rollback file not found

* [HUDI-4526] Improve spillableMapBasePath when disk directory is full (apache#6284)

* [minor] Refactor the code for CkpMetadata (apache#7166)

* [HUDI-5111] Improve integration test coverage (apache#7092)



Co-authored-by: Raymond Xu <2701446+xushiyan@users.noreply.github.com>

* [HUDI-5187] Remove the preCondition check of BucketAssigner assign state (apache#7170)

* [HUDI-5145] Avoid starting HDFS in hudi-utilities tests (apache#7171)

* [MINOR] Performance improvement of flink ITs with reused miniCluster (apache#7151)

* implement MiniCluster extension compatible with junit5

* Make local build work

* Delete files removed in OSS

* Fix bug in testing

* Upgrade to version release-v0.10.0

Co-authored-by: 5herhom <543872547@qq.com>
Co-authored-by: Y Ethan Guo <ethan.guoyihua@gmail.com>
Co-authored-by: Jon Vexler <jon@onehouse.ai>
Co-authored-by: simonsssu <barley0806@gmail.com>
Co-authored-by: y0908105023 <283999377@qq.com>
Co-authored-by: yangshuo3 <yangshuo3@kingsoft.com>
Co-authored-by: Volodymyr Burenin <vburenin@gmail.com>
Co-authored-by: Volodymyr Burenin <volodymyr.burenin@cloudkitchens.com>
Co-authored-by: 冯健 <fengjian428@gmail.com>
Co-authored-by: jian.feng <jian.feng@shopee.com>
Co-authored-by: FocusComputing <xiaoxingstack@gmail.com>
Co-authored-by: xiaoxingstack <xiaoxingstack@didiglobal.com>
Co-authored-by: Paul Zhang <xzhangyao@126.com>
Co-authored-by: Danny Chan <yuzhao.cyz@gmail.com>
Co-authored-by: Sagar Sumit <sagarsumit09@gmail.com>
Co-authored-by: eric9204 <90449228+eric9204@users.noreply.github.com>
Co-authored-by: dongsj <dongsj@asiainfo.com>
Co-authored-by: Kyle Zhike Chen <zk.chan007@gmail.com>
Co-authored-by: Yann Byron <biyan900116@gmail.com>
Co-authored-by: Shiyan Xu <2701446+xushiyan@users.noreply.github.com>
Co-authored-by: dohongdayi <dohongdayi@126.com>
Co-authored-by: shaoxiong.zhan <31836510+microbearz@users.noreply.github.com>
Co-authored-by: zhanshaoxiong <shaoxiong0001@@gmail.com>
Co-authored-by: Manu <36392121+xicm@users.noreply.github.com>
Co-authored-by: Nicolas Paris <nicolas.paris@riseup.net>
Co-authored-by: sivabalan <n.siva.b@gmail.com>
Co-authored-by: RexAn <bonean131@gmail.com>
Co-authored-by: Alexey Kudinkin <alexey@infinilake.com>
Co-authored-by: Rahil C <32500120+rahil-c@users.noreply.github.com>
Co-authored-by: Rahil Chertara <rchertar@amazon.com>
Co-authored-by: Timothy Brown <tim@onehouse.ai>
Co-authored-by: Shawn Chang <42792772+CTTY@users.noreply.github.com>
Co-authored-by: Shawn Chang <yxchang@amazon.com>
Co-authored-by: ForwardXu <forwardxu315@gmail.com>
Co-authored-by: wangxianghu <wangxianghu@apache.org>
Co-authored-by: wulei <wulei.1023@bytedance.com>
Co-authored-by: Xingjun Wang <wongxingjun@126.com>
Co-authored-by: Prasanna Rajaperumal <prasanna.raj@live.com>
Co-authored-by: xingjunwang <xingjunwang@tencent.com>
Co-authored-by: liujinhui <965147871@qq.com>
Co-authored-by: 苏承祥 <scx_white@aliyun.com>
Co-authored-by: 苏承祥 <sucx@tuya.com>
Co-authored-by: ChanKyeong Won <brightwon.dev@gmail.com>
Co-authored-by: Zouxxyy <zouxxyy@qq.com>
Co-authored-by: Nicholas Jiang <programgeek@163.com>
Co-authored-by: KnightChess <981159963@qq.com>
Co-authored-by: Forus <70357858+Forus0322@users.noreply.github.com>
Co-authored-by: voonhous <voonhousu@gmail.com>
Co-authored-by: TengHuo <teng_huo@outlook.com>
Co-authored-by: hj2016 <hj3245459@163.com>
Co-authored-by: huangjing02 <huangjing02@bilibili.com>
Co-authored-by: jsbali <jsbali@uber.com>
Co-authored-by: Leon Tsao <31072303+gnailJC@users.noreply.github.com>
Co-authored-by: leon <leon@leondeMacBook-Pro.local>
Co-authored-by: 申胜利 <48829688+shenshengli@users.noreply.github.com>
Co-authored-by: aiden.dong <782112163@qq.com>
Co-authored-by: dujunling <dujunling@bytedance.com>
Co-authored-by: Pramod Biligiri <pramodbiligiri@gmail.com>
Co-authored-by: Zouxxyy <zouxinyu.zxy@alibaba-inc.com>
Co-authored-by: Alexey Kudinkin <alexey.kudinkin@gmail.com>
Co-authored-by: Surya Prasanna <syalla@uber.com>
Co-authored-by: Rajesh Mahindra <76502047+rmahindra123@users.noreply.github.com>
Co-authored-by: rmahindra123 <rmahindra@Rajeshs-MacBook-Pro.local>
Co-authored-by: huberylee <shibei.lh@foxmail.com>
Co-authored-by: YueZhang <69956021+zhangyue19921010@users.noreply.github.com>
Co-authored-by: yuezhang <yuezhang@yuezhang-mac.freewheelmedia.net>
Co-authored-by: yuezhang <yuezhang@freewheel.tv>
Co-authored-by: slfan1989 <55643692+slfan1989@users.noreply.github.com>
Co-authored-by: slfan1989 <louj1988@@>
Co-authored-by: 吴祥平 <408317717@qq.com>
Co-authored-by: wangzeyu <hameizi369@gmail.com>
Co-authored-by: vvsd <40269480+vvsd@users.noreply.github.com>
Co-authored-by: Zhaojing Yu <yuzhaojing@bytedance.com>
Co-authored-by: Bingeng Huang <304979636@qq.com>
Co-authored-by: hbg <bingeng.huang@shopee.com>
Co-authored-by: that's cool <1059023054@qq.com>
Co-authored-by: liufangqi.chenfeng <liufangqi.chenfeng@BYTEDANCE.COM>
Co-authored-by: Yuwei XIAO <ywxiaozero@gmail.com>
Co-authored-by: gavin <zhangrenhuaman@163.com>
Co-authored-by: Jon Vexler <jbvexler@gmail.com>
Co-authored-by: Xixi Hua <smilecrazy1h@gmail.com>
Co-authored-by: xxhua <xxhua@freewheel.tv>
Co-authored-by: YangXiao <919869387@qq.com>
Co-authored-by: chao chen <59957056+waywtdcc@users.noreply.github.com>
Co-authored-by: Zhangshunyu <zhangshunyu1990@126.com>
Co-authored-by: Long Zhao <294514940@qq.com>
Co-authored-by: z00484332 <zhaolong36@huawei.com>
Co-authored-by: 矛始 <1032851561@qq.com>
Co-authored-by: chenzhiming <chenzhm@chinatelecom.cn>
Co-authored-by: lvhu-goodluck <81349721+lvhu-goodluck@users.noreply.github.com>
Co-authored-by: alberic <cnuliuweiren@gmail.com>
Co-authored-by: lxxyyds <114218541+lxxawfl@users.noreply.github.com>
Co-authored-by: Alexander Trushev <42293632+trushev@users.noreply.github.com>
Co-authored-by: xiarixiaoyao <mengtao0326@qq.com>
Co-authored-by: windWheel <1817802738@qq.com>
Co-authored-by: Alexander Trushev <trushev.alex@gmail.com>
Co-authored-by: Shizhi Chen <107476116+chenshzh@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
big-needle-movers priority:critical production down; pipelines stalled; Need help asap. writer-core Issues relating to core transactions/write actions
Projects
Status: No status
Status: ✅ Done
Development

Successfully merging this pull request may close these issues.

None yet

10 participants