Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SUPPORT] java.lang.IllegalStateException: Duplicate key Option #4227

Closed
yanenze opened this issue Dec 6, 2021 · 13 comments
Closed

[SUPPORT] java.lang.IllegalStateException: Duplicate key Option #4227

yanenze opened this issue Dec 6, 2021 · 13 comments
Labels
priority:major degraded perf; unable to move forward; potential bugs

Comments

@yanenze
Copy link
Contributor

yanenze commented Dec 6, 2021

when i run a flink sink hudi program, this problem has occured

stack info like this:

org.apache.flink.util.FlinkException: Global failure triggered by OperatorCoordinator for 'hoodie_stream_write' (operator f1d7c56f4bf5fc204e4401416e5b3884).
at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:557)
at org.apache.hudi.sink.StreamWriteOperatorCoordinator.lambda$start$0(StreamWriteOperatorCoordinator.java:170)
at org.apache.hudi.sink.utils.NonThrownExecutor.lambda$execute$0(NonThrownExecutor.java:103)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.hudi.exception.HoodieException: Executor executes action [initialize instant ] error
... 5 more
Caused by: java.lang.IllegalStateException: Duplicate key Option{val=org.apache.hudi.common.HoodiePendingRollbackInfo@49b657cc}
at java.util.stream.Collectors.lambda$throwingMerger$0(Collectors.java:133)
at java.util.HashMap.merge(HashMap.java:1254)
at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1320)
at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
at org.apache.hudi.client.AbstractHoodieWriteClient.getPendingRollbackInfos(AbstractHoodieWriteClient.java:880)
at org.apache.hudi.client.AbstractHoodieWriteClient.rollbackFailedWrites(AbstractHoodieWriteClient.java:897)
at org.apache.hudi.client.AbstractHoodieWriteClient.rollbackFailedWrites(AbstractHoodieWriteClient.java:887)
at org.apache.hudi.client.AbstractHoodieWriteClient.lambda$startCommitWithTime$97cdbdca$1(AbstractHoodieWriteClient.java:780)
at org.apache.hudi.common.util.CleanerUtils.rollbackFailedWrites(CleanerUtils.java:143)
at org.apache.hudi.client.AbstractHoodieWriteClient.startCommitWithTime(AbstractHoodieWriteClient.java:779)
at org.apache.hudi.client.AbstractHoodieWriteClient.startCommitWithTime(AbstractHoodieWriteClient.java:772)
at org.apache.hudi.sink.StreamWriteOperatorCoordinator.startInstant(StreamWriteOperatorCoordinator.java:334)
at org.apache.hudi.sink.StreamWriteOperatorCoordinator.lambda$initInstant$5(StreamWriteOperatorCoordinator.java:361)
at org.apache.hudi.sink.utils.NonThrownExecutor.lambda$execute$0(NonThrownExecutor.java:93)
... 3 more

my hudi-flink-bundle version is 0.10.0 and flink version is 1.13.1

@yanenze
Copy link
Contributor Author

yanenze commented Dec 9, 2021

maybe i have resolved this problem by modify this class AbstractHoodieWriteClient.getPendingRollbackInfos like this

protected Map<String, Option> getPendingRollbackInfos(@NotNull HoodieTableMetaClient metaClient) {
return metaClient.getActiveTimeline().filterPendingRollbackTimeline().getInstants().map(
entry -> {
try {
HoodieRollbackPlan rollbackPlan = RollbackUtils.getRollbackPlan(metaClient, entry);
LOG.info("数据回滚" + rollbackPlan.getInstantToRollback().getCommitTime() + ":" + entry.toString());
return Pair.of(rollbackPlan.getInstantToRollback().getCommitTime(), Option.of(new HoodiePendingRollbackInfo(entry, rollbackPlan)));
} catch (IOException e) {
throw new HoodieIOException("Fetching rollback plan failed for " + entry, e);
}
} //源码修改
).filter(distinctByKey(pair->pair.getKey())).collect(Collectors.toMap(Pair::getKey, Pair::getValue));
}

//源码修改
@NotNull
@contract(pure = true)
static Predicate distinctByKey(Function keyExtractor) {
Map<Object,Boolean> seen = new ConcurrentHashMap<>();
return t -> seen.putIfAbsent(keyExtractor.apply(t), Boolean.TRUE) == null;
}

i run my program for three days and that problem dosen‘t occured.

@nsivabalan
Copy link
Contributor

some of your text is in non-english. but if I am not wrong, its about having duplicate entires in the map returned from getPendingRollbackInfos. Can you put up a patch with the fix you have. I can help review and land it.

@nsivabalan nsivabalan added core-flow-ds priority:major degraded perf; unable to move forward; potential bugs labels Dec 12, 2021
@yanenze
Copy link
Contributor Author

yanenze commented Dec 12, 2021 via email

@yanenze
Copy link
Contributor Author

yanenze commented Dec 12, 2021

some of your text is in non-english. but if I am not wrong, its about having duplicate entires in the map returned from getPendingRollbackInfos. Can you put up a patch with the fix you have. I can help review and land it.

i have sent patch to you mailbox, and look forward to your favourable reply.

@minchowang
Copy link
Contributor

hi @yanenze , i also has this probloms,do you have push patch to github repository ?

@danny0405
Copy link
Contributor

I have fixed it al-together in this PR: #4296

@yanenze
Copy link
Contributor Author

yanenze commented Dec 13, 2021

hi @yanenze , i also has this probloms,do you have push patch to github repository ?

hi, this problem has been resolved in this PR: #4296

@yanenze yanenze closed this as completed Dec 13, 2021
@vinothchandar
Copy link
Member

cc @nsivabalan

@nsivabalan
Copy link
Contributor

@yanenze : Do you happen to know when this could happen. from what I see, this can happen only if we have two rollback instants with sametimestamp ? (not the commit that is being rolledback, but the rollback timestamp).
Can you paste the contents of .hoodie folder when this issue happened.

@yanenze
Copy link
Contributor Author

yanenze commented Dec 16, 2021

@yanenze : Do you happen to know when this could happen. from what I see, this can happen only if we have two rollback instants with sametimestamp ? (not the commit that is being rolledback, but the rollback timestamp). Can you paste the contents of .hoodie folder when this issue happened.

i am sorry for that i have not met this problem on my test environment with 0.10.0-release, maybe the data size was not big enough , on the product environment since i have modify the code that problem has never occured and i have inspected the data synchrozied by flink when rollback action executed, the data was not lose or duplicate.

@waywtdcc
Copy link
Contributor

Is there any other way to deal with this? There is no official version of fix yet. Can this fix release a new version as soon as possible? This error is very easy to occur.

@nsivabalan
Copy link
Contributor

@waywtdcc : You can try to patch the linked patch for now. We are looking to do a minor release by early jan. Will keep you posted.

@waywtdcc
Copy link
Contributor

ok, thank you very much

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
priority:major degraded perf; unable to move forward; potential bugs
Projects
None yet
Development

No branches or pull requests

6 participants