Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

checkpoint timeout #5555

Closed
2 of 3 tasks
Japson0 opened this issue Sep 26, 2023 · 15 comments
Closed
2 of 3 tasks

checkpoint timeout #5555

Japson0 opened this issue Sep 26, 2023 · 15 comments
Labels

Comments

@Japson0
Copy link

Japson0 commented Sep 26, 2023

Search before asking

  • I had searched in the issues and found no similar issues.

What happened

I ran a long-running SeaTunnel task, but any time it passed the timeout node, it threw an error.

I looked around and found that the following configuration generates 3 pipelineSubtasks. But CheckpointCoordinator only goes to triggerBarrier task in startingTask every time。The remaining tasks will wait until the timeout location。
Is this because it's not done yet?

In  CheckpointCoordinator.class  

    private Set<Long> getNotYetAcknowledgedTasks() {
        // TODO: some tasks have completed and don't need to be ack
        return plan.getPipelineSubtasks().stream()
                .map(TaskLocation::getTaskID)
                .collect(Collectors.toCollection(CopyOnWriteArraySet::new));
    }

    public InvocationFuture<?>[] triggerCheckpoint(CheckpointBarrier checkpointBarrier) {
        // TODO: some tasks have completed and don't need to trigger
        return plan.getStartingSubtasks().stream()
                .map(
                        taskLocation ->
                                new CheckpointBarrierTriggerOperation(
                                        checkpointBarrier, taskLocation))
                .map(checkpointManager::sendOperationToMemberNode)
                .toArray(InvocationFuture[]::new);
    }

SeaTunnel Version

2.3.3

SeaTunnel Config

{
  "env": {
    "job.mode": "BATCH"
  },
  "source": [
    {
      "result_table_name": "jdbc_temp_table",
      "url": "jdbc:mysql://****:3306/test_src",
      "driver": "com.mysql.cj.jdbc.Driver",
      "user": "***",
      "password": "***",
      "query": "SELECT id, col1, col2, col3, col4, col5, col6, col7, col8, col9, col10, col11, col12, col13, col14, col15 FROM test_data_5",
      "plugin_name": "JDBC"
    }
  ],
  "sink": [
    {
      "source_table": "jdbc_temp_table",
      "url": "jdbc:mysql://***:3306/test_dst",
      "driver": "com.mysql.cj.jdbc.Driver",
      "user": "***",
      "password": "***",
      "query": "INSERT INTO test_data_5(id,col1,col2,col3,col4,col5,col6,col7,col8,col9,col10,col11,col12,col13,col14,col15) VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)",
      "plugin_name": "JDBC"
    }
  ]
}

Running Command

./bin/seatunnel.sh --config conf.json

Error Exception

2023-09-26 10:29:52,676 WARN  org.apache.seatunnel.engine.server.dag.physical.SubPlan - Job my2my.json (758877844708261889), Pipeline: [(1/1)] checkpoint have error, cancel the pipeline
2023-09-26 10:29:52,676 WARN  org.apache.seatunnel.engine.server.dag.physical.SubPlan - start cancel job Job my2my.json (758877844708261889), Pipeline: [(1/1)] count = 0
2023-09-26 10:29:52,677 WARN  org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex - The task Job my2my.json (758877844708261889), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-JDBC-jdbc_temp_table]-SplitEnumerator (1/1)], taskGroupLocation: [TaskGroupLocation{jobId=758877844708261889, pipelineId=1, taskGroupId=1}] state in Imap is RUNNING, not equals expected state CREATED
2023-09-26 10:29:52,677 WARN  org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex - The task Job my2my.json (758877844708261889), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-JDBC-jdbc_temp_table]-SplitEnumerator (1/1)], taskGroupLocation: [TaskGroupLocation{jobId=758877844708261889, pipelineId=1, taskGroupId=1}] state in Imap is RUNNING, not equals expected state SCHEDULED
2023-09-26 10:29:52,677 WARN  org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex - The task Job my2my.json (758877844708261889), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-JDBC-jdbc_temp_table]-SplitEnumerator (1/1)], taskGroupLocation: [TaskGroupLocation{jobId=758877844708261889, pipelineId=1, taskGroupId=1}] state in Imap is RUNNING, not equals expected state DEPLOYING
2023-09-26 10:29:52,677 WARN  org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex - The task Job my2my.json (758877844708261889), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-JDBC-jdbc_temp_table]-SourceTask (1/1)], taskGroupLocation: [TaskGroupLocation{jobId=758877844708261889, pipelineId=1, taskGroupId=30000}] state in Imap is RUNNING, not equals expected state CREATED
2023-09-26 10:29:52,678 WARN  org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex - The task Job my2my.json (758877844708261889), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-JDBC-jdbc_temp_table]-SourceTask (1/1)], taskGroupLocation: [TaskGroupLocation{jobId=758877844708261889, pipelineId=1, taskGroupId=30000}] state in Imap is RUNNING, not equals expected state SCHEDULED
2023-09-26 10:29:52,678 WARN  org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex - The task Job my2my.json (758877844708261889), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-JDBC-jdbc_temp_table]-SourceTask (1/1)], taskGroupLocation: [TaskGroupLocation{jobId=758877844708261889, pipelineId=1, taskGroupId=30000}] state in Imap is RUNNING, not equals expected state DEPLOYING
2023-09-26 10:29:52,678 ERROR org.apache.seatunnel.engine.server.dag.physical.SubPlan - Job my2my.json (758877844708261889), Pipeline: [(1/1)] cancel error with exception: java.lang.InterruptedException
	at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:347)
	at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
	at org.apache.seatunnel.engine.server.dag.physical.SubPlan.cancelPipelineTasks(SubPlan.java:461)
	at org.apache.seatunnel.engine.server.dag.physical.SubPlan.cancelPipeline(SubPlan.java:417)
	at org.apache.seatunnel.engine.server.dag.physical.SubPlan.handleCheckpointError(SubPlan.java:659)
	at org.apache.seatunnel.engine.server.master.JobMaster.lambda$handleCheckpointError$2(JobMaster.java:345)
	at java.util.ArrayList.forEach(ArrayList.java:1257)
	at org.apache.seatunnel.engine.server.master.JobMaster.handleCheckpointError(JobMaster.java:342)
	at org.apache.seatunnel.engine.server.checkpoint.CheckpointManager.handleCheckpointError(CheckpointManager.java:180)
	at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.handleCoordinatorError(CheckpointCoordinator.java:268)
	at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.lambda$null$9(CheckpointCoordinator.java:539)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

2023-09-26 10:29:52,681 WARN  org.apache.seatunnel.engine.server.TaskExecutionService - [localhost]:5801 [seatunnel-718890] [5.1] Interrupted task 20000 - org.apache.seatunnel.engine.server.task.SourceSplitEnumeratorTask@6f8ceee0
2023-09-26 10:29:52,682 ERROR org.apache.seatunnel.engine.server.checkpoint.operation.TaskAcknowledgeOperation - [localhost]:5801 [seatunnel-718890] [5.1] null
java.lang.NullPointerException: null
	at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.acknowledgeTask(CheckpointCoordinator.java:689) ~[classes/:?]
	at org.apache.seatunnel.engine.server.checkpoint.CheckpointManager.acknowledgeTask(CheckpointManager.java:270) ~[classes/:?]
	at org.apache.seatunnel.engine.server.checkpoint.operation.TaskAcknowledgeOperation.run(TaskAcknowledgeOperation.java:81) ~[classes/:?]
	at com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189) [hazelcast-5.1.jar:5.1]
	at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273) [hazelcast-5.1.jar:5.1]
	at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248) [hazelcast-5.1.jar:5.1]
	at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:213) [hazelcast-5.1.jar:5.1]
	at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:175) [hazelcast-5.1.jar:5.1]
	at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:139) [hazelcast-5.1.jar:5.1]
	at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.executeRun(OperationThread.java:123) [hazelcast-5.1.jar:5.1]
	at com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:102) [hazelcast-5.1.jar:5.1]
2023-09-26 10:29:52,575 ERROR org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator - trigger checkpoint failed
org.apache.seatunnel.engine.server.checkpoint.CheckpointException: Checkpoint expired before completing. Please increase checkpoint timeout in the seatunnel.yaml or jobConfig env.
	at org.apache.seatunnel.engine.server.checkpoint.PendingCheckpoint.abortCheckpoint(PendingCheckpoint.java:176) ~[classes/:?]
	at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.lambda$cleanPendingCheckpoint$19(CheckpointCoordinator.java:652) ~[classes/:?]
	at java.util.concurrent.ConcurrentHashMap$ValuesView.forEach(ConcurrentHashMap.java:4707) ~[?:1.8.0_202]
	at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.cleanPendingCheckpoint(CheckpointCoordinator.java:650) ~[classes/:?]
	at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.handleCoordinatorError(CheckpointCoordinator.java:263) ~[classes/:?]
	at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.lambda$null$9(CheckpointCoordinator.java:539) ~[classes/:?]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_202]
	at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_202]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) ~[?:1.8.0_202]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) ~[?:1.8.0_202]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_202]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_202]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_202]
2023-09-26 10:29:52,681 WARN  org.apache.seatunnel.engine.server.TaskExecutionService - [localhost]:5801 [seatunnel-718890] [5.1] Exception in org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask@5d235d6b
java.lang.RuntimeException: java.lang.InterruptedException
	at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.received(IntermediateBlockingQueue.java:41) ~[classes/:?]
	at org.apache.seatunnel.engine.server.task.flow.IntermediateQueueFlowLifeCycle.received(IntermediateQueueFlowLifeCycle.java:46) ~[classes/:?]
	at org.apache.seatunnel.engine.server.task.flow.IntermediateQueueFlowLifeCycle.received(IntermediateQueueFlowLifeCycle.java:28) ~[classes/:?]
	at org.apache.seatunnel.engine.server.task.SeaTunnelSourceCollector.sendRecordToNext(SeaTunnelSourceCollector.java:206) ~[classes/:?]
	at org.apache.seatunnel.engine.server.task.SeaTunnelSourceCollector.collect(SeaTunnelSourceCollector.java:123) ~[classes/:?]
	at org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceReader.pollNext(JdbcSourceReader.java:64) ~[classes/:?]
	at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.collect(SourceFlowLifeCycle.java:150) ~[classes/:?]
	at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.collect(SourceSeaTunnelTask.java:105) ~[classes/:?]
	at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:167) ~[classes/:?]
	at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.call(SourceSeaTunnelTask.java:110) ~[classes/:?]
	at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:611) [classes/:?]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_202]
	at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_202]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_202]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_202]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_202]
Caused by: java.lang.InterruptedException
	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014) ~[?:1.8.0_202]
	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048) ~[?:1.8.0_202]
	at java.util.concurrent.ArrayBlockingQueue.put(ArrayBlockingQueue.java:353) ~[?:1.8.0_202]
	at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.handleRecord(IntermediateBlockingQueue.java:75) ~[classes/:?]
	at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.received(IntermediateBlockingQueue.java:39) ~[classes/:?]
	... 15 more
2023-09-26 10:29:52,686 ERROR org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator - report error from task
org.apache.seatunnel.common.utils.SeaTunnelException: java.lang.RuntimeException: java.util.concurrent.CompletionException: java.lang.NullPointerException
	at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.received(IntermediateBlockingQueue.java:41)
	at org.apache.seatunnel.engine.server.task.flow.IntermediateQueueFlowLifeCycle.received(IntermediateQueueFlowLifeCycle.java:46)
	at org.apache.seatunnel.engine.server.task.flow.IntermediateQueueFlowLifeCycle.received(IntermediateQueueFlowLifeCycle.java:28)
	at org.apache.seatunnel.engine.server.task.SeaTunnelSourceCollector.sendRecordToNext(SeaTunnelSourceCollector.java:206)
	at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.triggerBarrier(SourceFlowLifeCycle.java:268)
	at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.triggerBarrier(SourceSeaTunnelTask.java:122)
	at org.apache.seatunnel.engine.server.task.operation.checkpoint.BarrierFlowOperation.lambda$null$0(BarrierFlowOperation.java:90)
	at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.CompletionException: java.lang.NullPointerException
	at com.hazelcast.spi.impl.AbstractInvocationFuture.returnOrThrowWithJoinConventions(AbstractInvocationFuture.java:819)
	at com.hazelcast.spi.impl.AbstractInvocationFuture.resolveAndThrowWithJoinConvention(AbstractInvocationFuture.java:835)
	at com.hazelcast.spi.impl.AbstractInvocationFuture.join(AbstractInvocationFuture.java:553)
	at org.apache.seatunnel.engine.server.task.SeaTunnelTask.ack(SeaTunnelTask.java:347)
	at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.handleRecord(IntermediateBlockingQueue.java:66)
	at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.received(IntermediateBlockingQueue.java:39)
	... 10 more
Caused by: java.lang.NullPointerException
	at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.acknowledgeTask(CheckpointCoordinator.java:689)
	at org.apache.seatunnel.engine.server.checkpoint.CheckpointManager.acknowledgeTask(CheckpointManager.java:270)
	at org.apache.seatunnel.engine.server.checkpoint.operation.TaskAcknowledgeOperation.run(TaskAcknowledgeOperation.java:81)
	at com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189)
	at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273)
	at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248)
	at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:213)
	at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:175)
	at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:139)
	at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.executeRun(OperationThread.java:123)
	at com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:102)

	at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.reportCheckpointErrorFromTask(CheckpointCoordinator.java:346) [classes/:?]
	at org.apache.seatunnel.engine.server.checkpoint.CheckpointManager.reportCheckpointErrorFromTask(CheckpointManager.java:188) [classes/:?]
	at org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointErrorReportOperation.run(CheckpointErrorReportOperation.java:48) [classes/:?]
	at com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189) [hazelcast-5.1.jar:5.1]
	at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273) [hazelcast-5.1.jar:5.1]
	at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248) [hazelcast-5.1.jar:5.1]
	at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:213) [hazelcast-5.1.jar:5.1]
	at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:175) [hazelcast-5.1.jar:5.1]
	at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:139) [hazelcast-5.1.jar:5.1]
	at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.executeRun(OperationThread.java:123) [hazelcast-5.1.jar:5.1]
	at com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:102) [hazelcast-5.1.jar:5.1]
2023-09-26 10:29:52,685 INFO  org.apache.seatunnel.engine.server.TaskExecutionService - [localhost]:5801 [seatunnel-718890] [5.1] taskDone, taskId = 40000, taskGroup = TaskGroupLocation{jobId=758877844708261889, pipelineId=1, taskGroupId=30000}
2023-09-26 10:29:53,124 DEBUG com.hazelcast.spi.impl.operationservice.impl.InvocationMonitor - [localhost]:5801 [seatunnel-718890] [5.1] Invocations:1 timeouts:0 backup-timeouts:0
2023-09-26 10:29:54,115 DEBUG com.hazelcast.spi.impl.operationservice.impl.InvocationMonitor - [localhost]:5801 [seatunnel-718890] [5.1] Invocations:1 timeouts:0 backup-timeouts:0
2023-09-26 10:29:54,691 WARN  org.apache.seatunnel.engine.server.dag.physical.SubPlan - Job my2my.json (758877844708261889), Pipeline: [(1/1)] cancel error will retry
java.lang.InterruptedException: null
	at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:347) ~[?:1.8.0_202]
	at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) ~[?:1.8.0_202]
	at org.apache.seatunnel.engine.server.dag.physical.SubPlan.cancelPipelineTasks(SubPlan.java:461) ~[classes/:?]
	at org.apache.seatunnel.engine.server.dag.physical.SubPlan.cancelPipeline(SubPlan.java:417) ~[classes/:?]
	at org.apache.seatunnel.engine.server.dag.physical.SubPlan.handleCheckpointError(SubPlan.java:659) ~[classes/:?]
	at org.apache.seatunnel.engine.server.master.JobMaster.lambda$handleCheckpointError$2(JobMaster.java:345) ~[classes/:?]
	at java.util.ArrayList.forEach(ArrayList.java:1257) ~[?:1.8.0_202]
	at org.apache.seatunnel.engine.server.master.JobMaster.handleCheckpointError(JobMaster.java:342) ~[classes/:?]
	at org.apache.seatunnel.engine.server.checkpoint.CheckpointManager.handleCheckpointError(CheckpointManager.java:180) ~[classes/:?]
	at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.handleCoordinatorError(CheckpointCoordinator.java:268) ~[classes/:?]
	at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.lambda$null$9(CheckpointCoordinator.java:539) ~[classes/:?]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_202]
	at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_202]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_202]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [?:1.8.0_202]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_202]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_202]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_202]
2023-09-26 10:29:54,691 WARN  org.apache.seatunnel.engine.server.dag.physical.SubPlan - start cancel job Job my2my.json (758877844708261889), Pipeline: [(1/1)] count = 1
2023-09-26 10:29:55,099 DEBUG org.apache.seatunnel.engine.server.service.slot.DefaultSlotService - start send heartbeat to resource manager, this address: [localhost]:5801
2023-09-26 10:29:55,099 DEBUG org.apache.seatunnel.engine.server.resourcemanager.AbstractResourceManager - received worker heartbeat from: [localhost]:5801
2023-09-26 10:29:55,115 DEBUG com.hazelcast.spi.impl.operationservice.impl.InvocationMonitor - [localhost]:5801 [seatunnel-718890] [5.1] Invocations:1 timeouts:0 backup-timeouts:0
2023-09-26 10:29:56,117 DEBUG com.hazelcast.spi.impl.operationservice.impl.InvocationMonitor - [localhost]:5801 [seatunnel-718890] [5.1] Invocations:1 timeouts:0 backup-timeouts:0
2023-09-26 10:29:57,113 DEBUG com.hazelcast.spi.impl.operationservice.impl.InvocationMonitor - [localhost]:5801 [seatunnel-718890] [5.1] Invocations:1 timeouts:0 backup-timeouts:0
2023-09-26 10:29:58,124 DEBUG com.hazelcast.spi.impl.operationservice.impl.InvocationMonitor - [localhost]:5801 [seatunnel-718890] [5.1] Invocations:1 timeouts:0 backup-timeouts:0
2023-09-26 10:29:59,116 DEBUG com.hazelcast.spi.impl.operationservice.impl.InvocationMonitor - [localhost]:5801 [seatunnel-718890] [5.1] Invocations:1 timeouts:0 backup-timeouts:0
2023-09-26 10:29:59,854 DEBUG org.apache.seatunnel.engine.server.TaskExecutionService - [localhost]:5801 [seatunnel-718890] [5.1] 

Zeta or Flink or Spark Version

2.3.3

Java or Scala Version

No response

Screenshots

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@Japson0 Japson0 added the bug label Sep 26, 2023
@lvf0052
Copy link

lvf0052 commented Sep 26, 2023

+1
I have the same question

@liugddx
Copy link
Member

liugddx commented Sep 26, 2023

i will check this.

@Japson0
Copy link
Author

Japson0 commented Sep 28, 2023

As I looked at the code below, this will generate three taskLocations with ids 200000, 400000, and 50000.
But the taskLocations with ids 40000 and 50000 never ack properly, so I found PendingCheckpoing with not isFullyAcknowledged all the time.

PendingCheckpoint.java
line 145        if (isFullyAcknowledged()) {
line 146           LOG.debug("checkpoint is full ack!");
line 147          completableFuture.complete(toCompletedCheckpoint());
        }

This then causes the checkpoint to never complete and eventually times out

@Japson0
Copy link
Author

Japson0 commented Sep 28, 2023

When using JDBC to read a very large table,I found that JDBCSourceReader added a lock when reading the data, but the lock could not be retrieved from triggerBarrier in SourceFlowLifeCycle, so this task could not ack successfully.

Even if I didn't need a checkpoint, I would eventually get an error if I exceeded the checkpoint timeout.

Do I have to either set the timeout to the maximum value of the integer, or set the interval so large that the checkpoint fails? Is there a configuration that makes it unnecessary to checkpoint, because this kind of data itself doesn't need to be checkpointed

image

image

@liugddx
Copy link
Member

liugddx commented Oct 3, 2023

When using JDBC to read a very large table,I found that JDBCSourceReader added a lock when reading the data, but the lock could not be retrieved from triggerBarrier in SourceFlowLifeCycle, so this task could not ack successfully.

Even if I didn't need a checkpoint, I would eventually get an error if I exceeded the checkpoint timeout.

Do I have to either set the timeout to the maximum value of the integer, or set the interval so large that the checkpoint fails? Is there a configuration that makes it unnecessary to checkpoint, because this kind of data itself doesn't need to be checkpointed

image

image

Can you set the timeout time? refer to https://seatunnel.apache.org/docs/2.3.3/seatunnel-engine/deployment#43-checkpoint-manager

@Japson0
Copy link
Author

Japson0 commented Oct 7, 2023

So is this problem just a timeout problem?
In theory, if I synchronize a very large table via Jdbc, wouldn't I get an error if I exceeded the timeout

@Japson0 Japson0 closed this as completed Oct 9, 2023
@lihjChina
Copy link
Contributor

+1
I have the same question。

@SamealD
Copy link

SamealD commented Oct 25, 2023

I have the same question. When I synchronize a big table, I set checkpoint.timeout = a verg big number, It still get an error said CheckpointException: Checkpoint expired before completing. Please increase checkpoint timeout ...

@liugddx
Copy link
Member

liugddx commented Oct 25, 2023

Refer to #5694. Try to use the latest code.

@SamealD
Copy link

SamealD commented Oct 27, 2023

Refer to #5694. Try to use the latest code.

I use the latest code and recompile, it still occur timeout error

@happyboy1024
Copy link
Contributor

Refer to #5694. Try to use the latest code.

I use the latest code and recompile, it still occur timeout error

You can try to set the 'partition_column' and 'partition_num' parameters to split your table, and then set the checkpoint interval based on the processing speed of each partition.

@voyagertanyao
Copy link

+1
I have the same question。

1 similar comment
@Aiden-Rose
Copy link

+1
I have the same question。

@W-dragan
Copy link

W-dragan commented Mar 8, 2024

If it is a batch task and the checkpoint timeout is not set in the task information, theoretically the checkpoint should not be triggered. Recently, I encountered this problem when running a large batch of data. The problem is not the size of the timeout setting, but whether the checkpoint should be triggered
@liugddx

@liugddx
Copy link
Member

liugddx commented Mar 8, 2024

If it is a batch task and the checkpoint timeout is not set in the task information, theoretically the checkpoint should not be triggered. Recently, I encountered this problem when running a large batch of data. The problem is not the size of the timeout setting, but whether the checkpoint should be triggered @liugddx

2.3.4 Support closing checkpoint

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

9 participants