Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SUPPORT] [BUG] Duplicate fileID ??? from bucket ?? of partition found during the BucketStreamWriteFunction index bootstrap. #5330

Closed
Guanpx opened this issue Apr 15, 2022 · 15 comments
Labels
data-consistency phantoms, duplicates, write skew, inconsistent snapshot flink Issues related to flink index priority:critical production down; pipelines stalled; Need help asap. release-0.11.0

Comments

@Guanpx
Copy link

Guanpx commented Apr 15, 2022

Describe the problem you faced

use flink1.13 ,bucket index , cow ,hudi-0.11.0(not latest)

To Reproduce

Steps to reproduce the behavior:

  1. start flink job
  2. cancel flink job
  3. repeat 1-2 some times
  4. start job,then that Exception was occured

Environment Description

  • Hudi version : 0.11.0

  • Flink version : 1.13.2

  • Hadoop version : 3.0.0

  • Storage (HDFS/S3/GCS..) :HDFS

  • Running on Docker? (yes/no) : no

Additional context

image

that 0000007-f0da file should same as before bucket files, format : 0000007-b513-xxxx

Stacktrace

java.lang.RuntimeException: Duplicate fileID 00000007-????-????-????-40bee2bd5a70 from bucket 7 of partition  found during the BucketStreamWriteFunction index bootstrap.
	at org.apache.hudi.sink.bucket.BucketStreamWriteFunction.lambda$bootstrapIndexIfNeed$1(BucketStreamWriteFunction.java:179)
	at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
	at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
	at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
	at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
	at org.apache.hudi.sink.bucket.BucketStreamWriteFunction.bootstrapIndexIfNeed(BucketStreamWriteFunction.java:173)
	at org.apache.hudi.sink.bucket.BucketStreamWriteFunction.processElement(BucketStreamWriteFunction.java:123)
	at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205)
	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
	at java.lang.Thread.run(Thread.java:748)

@Guanpx
Copy link
Author

Guanpx commented Apr 15, 2022

cc @danny0405

@wxplovecc
Copy link
Contributor

see #5185

@danny0405
Copy link
Contributor

Thanks for the PR @wxplovecc , can you explain why the #5185 patch fixed the bug ?

@danny0405
Copy link
Contributor

@Guanpx Did you use the code of latest master ? The shuffle strategy in

determines that one bucket number is always handled by a single task, that means no two write tasks would write to the same bucket number, and within one write task, there is {bucket id} to {file id} mapping.

Do you guys have any idea how the error happens ?

@Guanpx
Copy link
Author

Guanpx commented Apr 19, 2022

I will try latest master tomorrow and debug that code ~

@wxplovecc
Copy link
Contributor

Thanks for the PR @wxplovecc , can you explain why the #5185 patch fixed the bug ?

The mor table rollback was not delete the log files , If the job failed before first successful commit it will left some invalid
log files, the PR #5185 was reuse the left invalid fileId by scan the left log file when restart the job

@yihua
Copy link
Contributor

yihua commented Apr 28, 2022

@Guanpx have you tried the latest master and see if the fix solves the problem for you?

@yihua yihua added this to Awaiting Triage in GI Tracker Board via automation Apr 28, 2022
@yihua yihua added priority:critical production down; pipelines stalled; Need help asap. flink Issues related to flink index labels Apr 28, 2022
@yihua yihua moved this from Awaiting Triage to User Action in GI Tracker Board Apr 29, 2022
@nsivabalan
Copy link
Contributor

@Guanpx : do you have any updates on this issue. if latest master works, feel free to close out the github issue.

@LinMingQiang
Copy link
Contributor

see #5763

@nsivabalan
Copy link
Contributor

closing the issue since the linked PR is landed. feel free to reopen or open a new issue if you are still facing issues.
thanks!

GI Tracker Board automation moved this from User Action to Done Aug 28, 2022
@codope codope added data-consistency phantoms, duplicates, write skew, inconsistent snapshot release-0.11.0 labels Apr 27, 2023
@jarrodcodes
Copy link

jarrodcodes commented May 20, 2023

Seeing this issue with Flink 1.15 and Hudi 0.12.3:

java.lang.RuntimeException: Duplicate fileId 00000002-fb1c-47ac-a203-397ffbbd9b91 from bucket 2 of partition dt=2022-11-21 found during the BucketStreamWriteFunction index bootstrap. at org.apache.hudi.sink.bucket.BucketStreamWriteFunction.lambda$bootstrapIndexIfNeed$1(BucketStreamWriteFunction.java:162) at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(Unknown Source) at java.base/java.util.stream.ReferencePipeline$3$1.accept(Unknown Source) at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(Unknown Source) at java.base/java.util.stream.AbstractPipeline.copyInto(Unknown Source) at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(Unknown Source) at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(Unknown Source) at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(Unknown Source) at java.base/java.util.stream.AbstractPipeline.evaluate(Unknown Source) at java.base/java.util.stream.ReferencePipeline.forEach(Unknown Source) at org.apache.hudi.sink.bucket.BucketStreamWriteFunction.bootstrapIndexIfNeed(BucketStreamWriteFunction.java:155) at org.apache.hudi.sink.bucket.BucketStreamWriteFunction.processElement(BucketStreamWriteFunction.java:111) at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) at java.base/java.lang.Thread.run(Unknown Source)

Steps to reproduce and general setup the same as above.

Looking for any suggestions, thank you.

@danny0405
Copy link
Contributor

Did you write to an exiting table or a fresh new table then?

@jarrodcodes
Copy link

Did you write to an exiting table or a fresh new table then?

This was a fresh table. Flink had restarted a few times due to a memory issue. It looks like this occured after that.

@danny0405
Copy link
Contributor

Did you use the COW table or MOR?

@jarrodcodes
Copy link

Did you use the COW table or MOR?

We are using COW.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
data-consistency phantoms, duplicates, write skew, inconsistent snapshot flink Issues related to flink index priority:critical production down; pipelines stalled; Need help asap. release-0.11.0
Projects
Development

No branches or pull requests

8 participants