[HUDI-5504]Fix concurrency conflict when asyncCompaction is enabled#7609
Conversation
| if (asyncCompaction) { | ||
| synchronized (mutex) { | ||
| super.processLatencyMarker(latencyMarker); | ||
| } |
There was a problem hiding this comment.
Can we just ignore the watermark and latency marker totally ? The compaction operators belongs to the sink ppeline, there is no need to propagate them to downstream.
There was a problem hiding this comment.
In my opinion, it will be more user-friendly to display through flink ui after delivery, and some specific functions may need to deliver this information. So i do not ignore the watermark and latency marker totally.
There was a problem hiding this comment.
now, i have ignored the watermark and latency marker totally. @danny0405
|
@hudi-bot re-run the last Azure build |
| package org.apache.hudi.sink.compact; | ||
|
|
||
| import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; | ||
| import org.apache.hudi.client.HoodieFlinkWriteClient; |
| @Override | ||
| public void processWatermark(Watermark mark) { | ||
| public void processWatermark(Watermark mark) throws Exception { | ||
| // no need to propagate the watermark |
| @Override | ||
| public void processLatencyMarker(LatencyMarker latencyMarker) | ||
| throws Exception { | ||
| // no need to propagate the latencyMarker |
There was a problem hiding this comment.
No need to throw exception.
There was a problem hiding this comment.
ClusteringOperator should also be fixed.
| () -> doCompaction(instantTime, compactionOperation, collector, reloadWriteConfig()), | ||
| (errMsg, t) -> collector.collect(new CompactionCommitEvent(instantTime, compactionOperation.getFileId(), taskID)), | ||
| (errMsg, t) -> { | ||
| collector.collect(new CompactionCommitEvent(instantTime, |
| instantTime, maxInstantTime, | ||
| writeClient.getHoodieTable().getTaskContextSupplier()); | ||
| collector.collect(new CompactionCommitEvent(instantTime, compactionOperation.getFileId(), writeStatuses, taskID)); | ||
| collector.collect(new CompactionCommitEvent(instantTime, compactionOperation.getFileId(), |
|
@hudi-bot run azure |
…latency marker (apache#7609) Co-authored-by: coder_wang <leiw5658@gmail.com>
…latency marker (apache#7609) Co-authored-by: coder_wang <leiw5658@gmail.com>
…latency marker (apache#7609) Co-authored-by: coder_wang <leiw5658@gmail.com>
Change Logs
Fix concurrency conflict when asyncCompaction is enabled
Impact
When asyncCompaction is enabled , CompactTaskMainThread and compactionThread will cause concurrency conflict when send watermark and compactionCommitEvent
Risk level (write none, low medium or high below)
high
Documentation Update
Contributor's checklist