Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Hotfix][Connector] Fix ConcurrentModificationException when snapshotState based on SourceReaderBase #4011

Merged
merged 1 commit into from
Feb 1, 2023

Conversation

hailin0
Copy link
Contributor

@hailin0 hailin0 commented Jan 31, 2023

Purpose of this pull request

Check list

@hailin0
Copy link
Contributor Author

hailin0 commented Jan 31, 2023

2023-01-31 10:23:30,740 ERROR org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointBarrierTriggerOperation - [ws4]:5801 [seatunnel_default_cluster] [5.1] java.util.ConcurrentModificationException
java.util.concurrent.CompletionException: java.util.ConcurrentModificationException
	at com.hazelcast.spi.impl.AbstractInvocationFuture.returnOrThrowWithJoinConventions(AbstractInvocationFuture.java:819) ~[seatunnel-starter.jar:2.3.1-SNAPSHOT]
	at com.hazelcast.spi.impl.AbstractInvocationFuture.resolveAndThrowWithJoinConvention(AbstractInvocationFuture.java:835) ~[seatunnel-starter.jar:2.3.1-SNAPSHOT]
	at com.hazelcast.spi.impl.AbstractInvocationFuture.join(AbstractInvocationFuture.java:545) ~[seatunnel-starter.jar:2.3.1-SNAPSHOT]
	at java.util.ArrayList.forEach(ArrayList.java:1255) ~[?:1.8.0_151]
	at org.apache.seatunnel.engine.server.task.SourceSplitEnumeratorTask.sendToAllReader(SourceSplitEnumeratorTask.java:277) ~[seatunnel-starter.jar:2.3.1-SNAPSHOT]
	at org.apache.seatunnel.engine.server.task.SourceSplitEnumeratorTask.triggerBarrier(SourceSplitEnumeratorTask.java:139) ~[seatunnel-starter.jar:2.3.1-SNAPSHOT]
	at org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointBarrierTriggerOperation.lambda$run$0(CheckpointBarrierTriggerOperation.java:77) ~[seatunnel-starter.jar:2.3.1-SNAPSHOT]
	at org.apache.seatunnel.common.utils.RetryUtils.retryWithException(RetryUtils.java:47) ~[seatunnel-starter.jar:2.3.1-SNAPSHOT]
	at org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointBarrierTriggerOperation.run(CheckpointBarrierTriggerOperation.java:72) ~[seatunnel-starter.jar:2.3.1-SNAPSHOT]
	at com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189) ~[seatunnel-starter.jar:2.3.1-SNAPSHOT]
	at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273) ~[seatunnel-starter.jar:2.3.1-SNAPSHOT]
	at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248) ~[seatunnel-starter.jar:2.3.1-SNAPSHOT]
	at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:213) ~[seatunnel-starter.jar:2.3.1-SNAPSHOT]
	at com.hazelcast.spi.impl.operationexecutor.impl.OperationExecutorImpl.run(OperationExecutorImpl.java:411) ~[seatunnel-starter.jar:2.3.1-SNAPSHOT]
	at com.hazelcast.spi.impl.operationexecutor.impl.OperationExecutorImpl.runOrExecute(OperationExecutorImpl.java:438) ~[seatunnel-starter.jar:2.3.1-SNAPSHOT]
	at com.hazelcast.spi.impl.operationservice.impl.Invocation.doInvokeLocal(Invocation.java:601) ~[seatunnel-starter.jar:2.3.1-SNAPSHOT]
	at com.hazelcast.spi.impl.operationservice.impl.Invocation.doInvoke(Invocation.java:580) ~[seatunnel-starter.jar:2.3.1-SNAPSHOT]
	at com.hazelcast.spi.impl.operationservice.impl.Invocation.invoke0(Invocation.java:541) ~[seatunnel-starter.jar:2.3.1-SNAPSHOT]
	at com.hazelcast.spi.impl.operationservice.impl.Invocation.invoke(Invocation.java:241) ~[seatunnel-starter.jar:2.3.1-SNAPSHOT]
	at com.hazelcast.spi.impl.operationservice.impl.InvocationBuilderImpl.invoke(InvocationBuilderImpl.java:61) ~[seatunnel-starter.jar:2.3.1-SNAPSHOT]
	at org.apache.seatunnel.engine.server.utils.NodeEngineUtil.sendOperationToMemberNode(NodeEngineUtil.java:40) ~[seatunnel-starter.jar:2.3.1-SNAPSHOT]
	at org.apache.seatunnel.engine.server.checkpoint.CheckpointManager.sendOperationToMemberNode(CheckpointManager.java:228) ~[seatunnel-starter.jar:2.3.1-SNAPSHOT]
	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) ~[?:1.8.0_151]
	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) ~[?:1.8.0_151]
	at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1553) ~[?:1.8.0_151]
	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) ~[?:1.8.0_151]
	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) ~[?:1.8.0_151]
	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:545) ~[?:1.8.0_151]
	at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260) ~[?:1.8.0_151]
	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:438) ~[?:1.8.0_151]
	at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.triggerCheckpoint(CheckpointCoordinator.java:425) ~[seatunnel-starter.jar:2.3.1-SNAPSHOT]
	at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602) ~[?:1.8.0_151]
	at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) ~[?:1.8.0_151]
	at java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:443) ~[?:1.8.0_151]
	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) ~[?:1.8.0_151]
	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) ~[?:1.8.0_151]
	at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) ~[?:1.8.0_151]
	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) ~[?:1.8.0_151]
Caused by: java.util.ConcurrentModificationException
	at java.util.HashMap.forEach(HashMap.java:1292) ~[?:1.8.0_151]
	at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.snapshotState(SourceReaderBase.java:118) ~[?:?]
	at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.snapshotState(IncrementalSourceReader.java:172) ~[?:?]
	at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.triggerBarrier(SourceFlowLifeCycle.java:177) ~[seatunnel-starter.jar:2.3.1-SNAPSHOT]
	at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.triggerBarrier(SourceSeaTunnelTask.java:85) ~[seatunnel-starter.jar:2.3.1-SNAPSHOT]
	at org.apache.seatunnel.engine.server.task.operation.checkpoint.BarrierFlowOperation.lambda$run$0(BarrierFlowOperation.java:77) ~[seatunnel-starter.jar:2.3.1-SNAPSHOT]
	at org.apache.seatunnel.common.utils.RetryUtils.retryWithException(RetryUtils.java:47) ~[seatunnel-starter.jar:2.3.1-SNAPSHOT]
	at org.apache.seatunnel.engine.server.task.operation.checkpoint.BarrierFlowOperation.run(BarrierFlowOperation.java:72) ~[seatunnel-starter.jar:2.3.1-SNAPSHOT]
	at com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189) ~[seatunnel-starter.jar:2.3.1-SNAPSHOT]
	at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273) ~[seatunnel-starter.jar:2.3.1-SNAPSHOT]
	at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248) ~[seatunnel-starter.jar:2.3.1-SNAPSHOT]
	at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:213) ~[seatunnel-starter.jar:2.3.1-SNAPSHOT]
	at com.hazelcast.spi.impl.operationexecutor.impl.OperationExecutorImpl.run(OperationExecutorImpl.java:411) ~[seatunnel-starter.jar:2.3.1-SNAPSHOT]
	at com.hazelcast.spi.impl.operationexecutor.impl.OperationExecutorImpl.runOrExecute(OperationExecutorImpl.java:438) ~[seatunnel-starter.jar:2.3.1-SNAPSHOT]
	at com.hazelcast.spi.impl.operationservice.impl.Invocation.doInvokeLocal(Invocation.java:601) ~[seatunnel-starter.jar:2.3.1-SNAPSHOT]
	at com.hazelcast.spi.impl.operationservice.impl.Invocation.doInvoke(Invocation.java:580) ~[seatunnel-starter.jar:2.3.1-SNAPSHOT]
	at com.hazelcast.spi.impl.operationservice.impl.Invocation.invoke0(Invocation.java:541) ~[seatunnel-starter.jar:2.3.1-SNAPSHOT]
	at com.hazelcast.spi.impl.operationservice.impl.Invocation.invoke(Invocation.java:241) ~[seatunnel-starter.jar:2.3.1-SNAPSHOT]
	at com.hazelcast.spi.impl.operationservice.impl.InvocationBuilderImpl.invoke(InvocationBuilderImpl.java:61) ~[seatunnel-starter.jar:2.3.1-SNAPSHOT]
	at org.apache.seatunnel.engine.server.utils.NodeEngineUtil.sendOperationToMemberNode(NodeEngineUtil.java:40) ~[seatunnel-starter.jar:2.3.1-SNAPSHOT]
	at org.apache.seatunnel.engine.server.execution.TaskExecutionContext.sendToMember(TaskExecutionContext.java:43) ~[seatunnel-starter.jar:2.3.1-SNAPSHOT]
	at org.apache.seatunnel.engine.server.task.SourceSplitEnumeratorTask.lambda$sendToAllReader$3(SourceSplitEnumeratorTask.java:276) ~[seatunnel-starter.jar:2.3.1-SNAPSHOT]
	at java.util.concurrent.ConcurrentHashMap.forEach(ConcurrentHashMap.java:1597) ~[?:1.8.0_151]
	at org.apache.seatunnel.engine.server.task.SourceSplitEnumeratorTask.sendToAllReader(SourceSplitEnumeratorTask.java:275) ~[seatunnel-starter.jar:2.3.1-SNAPSHOT]
	... 33 more

@hailin0
Copy link
Contributor Author

hailin0 commented Jan 31, 2023

PTAL

@TyrantLucifer TyrantLucifer merged commit cd2bd6a into apache:dev Feb 1, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants