Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug2.3.4] [postgresqlCDC]对源表做update操作时 ,同步任务报错停止When updating the source table, the synchronization task stops with an error #6407

Open
2 of 3 tasks
2606090723 opened this issue Feb 28, 2024 · 6 comments

Comments

@2606090723
Copy link

Search before asking

  • I had searched in the issues and found no similar issues.

What happened

image 2024-02-28 08:45:23,237 ERROR [o.a.s.c.s.SeaTunnel ] [main] - ===============================================================================

Exception in thread "main" org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed
at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:199)
at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34)
Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: java.lang.NullPointerException
at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializationConverters.convert(SeaTunnelRowDebeziumDeserializationConverters.java:88)
at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializeSchema.extractBeforeRow(SeaTunnelRowDebeziumDeserializeSchema.java:222)
at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializeSchema.deserializeDataChangeRecord(SeaTunnelRowDebeziumDeserializeSchema.java:189)
at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializeSchema.deserialize(SeaTunnelRowDebeziumDeserializeSchema.java:111)
at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.emitElement(IncrementalSourceRecordEmitter.java:157)
at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.processElement(IncrementalSourceRecordEmitter.java:132)
at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.emitRecord(IncrementalSourceRecordEmitter.java:89)
at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.emitRecord(IncrementalSourceRecordEmitter.java:55)
at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:108)
at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:110)
at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:112)
at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:110)
at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.collect(SourceFlowLifeCycle.java:150)
at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.collect(SourceSeaTunnelTask.java:116)
at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:168)
at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.call(SourceSeaTunnelTask.java:121)
at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:643)
at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:944)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)

    at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:191)
    ... 2 more

2024-02-28 08:45:23,260 INFO [.e.FieldNamedPreparedStatement] [BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=30000}] - PrepareStatement sql is:
INSERT INTO "test_db"."public"."t_user_2" ("id", "name", "age", "update_time") VALUES (?, ?, ?, ?) ON CONFLICT ("id") DO UPDATE SET "id"=EXCLUDED."id", "name"=EXCLUDED."name", "age"=EXCLUDED."age", "update_time"=EXCLUDED."update_time"

2024-02-28 08:45:23,265 INFO [.e.FieldNamedPreparedStatement] [BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=30000}] - PrepareStatement sql is:
DELETE FROM "test_db"."public"."t_user_2" WHERE "id" = ?

2024-02-28 08:45:23,267 INFO [.JdbcMultiTableResourceManager] [BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=30000}] - start close connection poolHikariPool-4
2024-02-28 08:45:23,268 INFO [o.a.s.s.c.z.h.HikariDataSource] [BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=30000}] - HikariPool-4 - Shutdown initiated...
2024-02-28 08:45:23,272 WARN [o.a.s.s.c.z.h.HikariDataSource] [BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=30000}] - HikariPool-4 - Interrupted during closing
java.lang.InterruptedException: null
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2067) ~[?:1.8.0_342]
at java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1475) ~[?:1.8.0_342]
at org.apache.seatunnel.shade.com.zaxxer.hikari.pool.HikariPool.shutdown(HikariPool.java:255) ~[?:?]
at org.apache.seatunnel.shade.com.zaxxer.hikari.HikariDataSource.close(HikariDataSource.java:351) ~[?:?]
at org.apache.seatunnel.connectors.seatunnel.jdbc.sink.ConnectionPoolManager.close(ConnectionPoolManager.java:66) ~[?:?]
at org.apache.seatunnel.connectors.seatunnel.jdbc.sink.JdbcMultiTableResourceManager.close(JdbcMultiTableResourceManager.java:42) ~[?:?]
at org.apache.seatunnel.connectors.seatunnel.common.multitablesink.MultiTableSinkWriter.close(MultiTableSinkWriter.java:251) ~[seatunnel-transforms-v2.jar:2.3.4]
at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.close(SinkFlowLifeCycle.java:153) ~[seatunnel-starter.jar:2.3.4]
at org.apache.seatunnel.engine.server.task.SeaTunnelTask.lambda$close$5(SeaTunnelTask.java:330) ~[seatunnel-starter.jar:2.3.4]
at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) [?:1.8.0_342]
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384) [?:1.8.0_342]
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) [?:1.8.0_342]
at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:290) [?:1.8.0_342]
at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731) [?:1.8.0_342]
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) [?:1.8.0_342]
at java.util.concurrent.ForkJoinPool.helpComplete(ForkJoinPool.java:1870) [?:1.8.0_342]
at java.util.concurrent.ForkJoinPool.externalHelpComplete(ForkJoinPool.java:2467) [?:1.8.0_342]
at java.util.concurrent.ForkJoinTask.externalAwaitDone(ForkJoinTask.java:324) [?:1.8.0_342]
at java.util.concurrent.ForkJoinTask.doInvoke(ForkJoinTask.java:405) [?:1.8.0_342]
at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:734) [?:1.8.0_342]
at java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:159) [?:1.8.0_342]
at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:173) [?:1.8.0_342]
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233) [?:1.8.0_342]
at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485) [?:1.8.0_342]
at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:650) [?:1.8.0_342]
at org.apache.seatunnel.engine.server.task.SeaTunnelTask.close(SeaTunnelTask.java:327) [seatunnel-starter.jar:2.3.4]
at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:660) [seatunnel-starter.jar:2.3.4]
at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:944) [seatunnel-starter.jar:2.3.4]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_342]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_342]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_342]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_342]
at java.lang.Thread.run(Thread.java:750) [?:1.8.0_342]
2024-02-28 08:45:23,899 INFO [r.IncrementalSourceSplitReader] [Source Data Fetcher for BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=30000}] - Close current fetcher org.apache.seatunnel.connectors.cdc.base.source.reader.external.IncrementalSourceStreamFetcher
2024-02-28 08:45:23,900 INFO [i.d.j.JdbcConnection ] [pool-64-thread-1] - Connection gracefully closed
2024-02-28 08:45:23,902 INFO [i.d.j.JdbcConnection ] [pool-65-thread-1] - Connection gracefully closed
2024-02-28 08:45:23,903 INFO [i.d.j.JdbcConnection ] [pool-66-thread-1] - Connection gracefully closed
2024-02-28 08:45:24,304 INFO [o.a.s.c.s.c.s.r.f.SplitFetcher] [Source Data Fetcher for BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=30000}] - Split fetcher 0 exited.
2024-02-28 08:45:24,316 INFO [s.c.s.s.c.ClientExecuteCommand] [Thread-105] - run shutdown hook because get close signal

SeaTunnel Version

SeaTunnel2.3.4 pgsqlCDC

SeaTunnel Config

env {
  # You can set engine configuration here
  execution.parallelism = 1
  job.mode = "STREAMING"
  checkpoint.interval = 5000
  read_limit.bytes_per_second=7000000
  read_limit.rows_per_second=400
}

source {
  Postgres-CDC {
    username = "postgres"
    password = "postgres"
    database-names = ["test_db"]
    schema-names = ["public"]
    table-names = ["test_db.public.t_user"]
    base-url = "jdbc:postgresql://192.168.3.46:30028/test_db?loggerLevel=OFF"
  }
}

transform {

}
sink {
  jdbc {
    url = "jdbc:postgresql://192.168.3.46:30028/test_db?loggerLevel=OFF"
    driver = "org.postgresql.Driver"
    user = "postgres"
    password = "postgres"

    generate_sink_sql = true
    # You need to configure both database and table
    database = test_db
    table = "public.t_user_2"
    primary_keys = ["id"]
   
  }
}

Running Command

root@7366e5930d9f:/opt/apache-seatunnel-2.3.4# ./bin/seatunnel.sh --config ./config/pgsqlcdc.template -e local

Error Exception

2024-02-28 08:45:23,064 INFO  [o.a.s.e.s.d.p.SubPlan         ] [seatunnel-coordinator-service-2] - Job SeaTunnel_Job (815141672055734273), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-Postgres-CDC]-SourceTask (1/1)] future complete with state FAILED
2024-02-28 08:45:23,064 ERROR [o.a.s.e.s.d.p.SubPlan         ] [seatunnel-coordinator-service-2] - Task TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=30000} Failed in Job SeaTunnel_Job (815141672055734273), Pipeline: [(1/1)], Begin to cancel other tasks in this pipeline.
2024-02-28 08:45:23,065 INFO  [o.a.s.e.s.d.p.SubPlan         ] [seatunnel-coordinator-service-2] - Job SeaTunnel_Job (815141672055734273), Pipeline: [(1/1)] turned from state RUNNING to FAILING.
2024-02-28 08:45:23,065 INFO  [o.a.s.e.s.d.p.PhysicalVertex  ] [seatunnel-coordinator-service-2] - Job SeaTunnel_Job (815141672055734273), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-Postgres-CDC]-SplitEnumerator (1/1)] state process is start
2024-02-28 08:45:23,066 INFO  [o.a.s.e.s.d.p.PhysicalVertex  ] [seatunnel-coordinator-service-2] - Job SeaTunnel_Job (815141672055734273), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-Postgres-CDC]-SplitEnumerator (1/1)] turned from state RUNNING to CANCELING.
2024-02-28 08:45:23,067 INFO  [o.a.s.e.s.d.p.PhysicalVertex  ] [seatunnel-coordinator-service-2] - Send cancel Job SeaTunnel_Job (815141672055734273), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-Postgres-CDC]-SplitEnumerator (1/1)] operator to member [localhost]:5801
2024-02-28 08:45:23,068 INFO  [o.a.s.e.s.TaskExecutionService] [seatunnel-coordinator-service-2] - [localhost]:5801 [seatunnel-347553] [5.1] Task (TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=1}) need cancel.
2024-02-28 08:45:23,069 INFO  [o.a.s.e.s.d.p.PhysicalVertex  ] [seatunnel-coordinator-service-2] - Job SeaTunnel_Job (815141672055734273), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-Postgres-CDC]-SourceTask (1/1)] state process is start
2024-02-28 08:45:23,069 WARN  [o.a.s.e.s.TaskExecutionService] [BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=1}] - [localhost]:5801 [seatunnel-347553] [5.1] Interrupted task 20000 - org.apache.seatunnel.engine.server.task.SourceSplitEnumeratorTask@1be5e807
2024-02-28 08:45:23,070 INFO  [o.a.s.e.s.TaskExecutionService] [BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=1}] - [localhost]:5801 [seatunnel-347553] [5.1] taskDone, taskId = 20000, taskGroup = TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=1}
2024-02-28 08:45:23,070 INFO  [o.a.s.e.c.l.ClassLoaderUtil   ] [BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=1}] - recycle classloader org.apache.seatunnel.engine.common.loader.SeaTunnelChildFirstClassLoader@1c65c15f
2024-02-28 08:45:23,072 INFO  [o.a.s.e.c.l.ClassLoaderUtil   ] [BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=1}] - recycle classloader for thread BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=1}
2024-02-28 08:45:23,073 INFO  [.e.IncrementalSourceEnumerator] [BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=1}] - Closing enumerator...
2024-02-28 08:45:23,073 INFO  [o.a.s.e.s.TaskExecutionService] [hz.main.seaTunnel.task.thread-2] - [localhost]:5801 [seatunnel-347553] [5.1] Task TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=1} complete with state CANCELED
2024-02-28 08:45:23,073 INFO  [o.a.s.e.s.CoordinatorService  ] [hz.main.seaTunnel.task.thread-2] - [localhost]:5801 [seatunnel-347553] [5.1] Received task end from execution TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=1}, state CANCELED
2024-02-28 08:45:23,075 INFO  [o.a.s.e.s.d.p.PhysicalVertex  ] [hz.main.seaTunnel.task.thread-2] - Job SeaTunnel_Job (815141672055734273), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-Postgres-CDC]-SplitEnumerator (1/1)] turned from state CANCELING to CANCELED.
2024-02-28 08:45:23,075 INFO  [o.a.s.e.s.d.p.PhysicalVertex  ] [hz.main.seaTunnel.task.thread-2] - Job SeaTunnel_Job (815141672055734273), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-Postgres-CDC]-SplitEnumerator (1/1)] state process is stopped
2024-02-28 08:45:23,075 INFO  [o.a.s.e.s.d.p.SubPlan         ] [seatunnel-coordinator-service-2] - Job SeaTunnel_Job (815141672055734273), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-Postgres-CDC]-SplitEnumerator (1/1)] future complete with state CANCELED
2024-02-28 08:45:23,076 INFO  [.s.e.s.c.CheckpointCoordinator] [seatunnel-coordinator-service-2] - start clean pending checkpoint cause Pipeline turn to end state.
2024-02-28 08:45:23,079 INFO  [a.s.c.s.c.s.r.SourceReaderBase] [BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=30000}] - Closing Source Reader.
2024-02-28 08:45:23,080 INFO  [.s.e.s.c.CheckpointCoordinator] [seatunnel-coordinator-service-2] - Turn checkpoint_state_815141672055734273_1 state from RUNNING to CANCELED
2024-02-28 08:45:23,081 INFO  [o.a.s.e.s.d.p.SubPlan         ] [seatunnel-coordinator-service-2] - Job SeaTunnel_Job (815141672055734273), Pipeline: [(1/1)] will end with state FAILED
2024-02-28 08:45:23,081 INFO  [o.a.s.e.s.d.p.SubPlan         ] [seatunnel-coordinator-service-2] - Job SeaTunnel_Job (815141672055734273), Pipeline: [(1/1)] turned from state FAILING to FAILED.
2024-02-28 08:45:23,085 WARN  [o.a.s.s.c.z.h.HikariConfig    ] [BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=30000}] - HikariPool-4 - idleTimeout has been set but has no effect because the pool is operating as a fixed size pool.
2024-02-28 08:45:23,085 INFO  [o.a.s.s.c.z.h.HikariDataSource] [BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=30000}] - HikariPool-4 - Starting...
2024-02-28 08:45:23,090 INFO  [o.a.s.c.s.c.s.r.f.SplitFetcher] [BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=30000}] - Shutting down split fetcher 0
2024-02-28 08:45:23,102 INFO  [o.a.s.e.s.m.JobMaster         ] [seatunnel-coordinator-service-2] - release the pipeline Job SeaTunnel_Job (815141672055734273), Pipeline: [(1/1)] resource
2024-02-28 08:45:23,102 INFO  [a.s.e.s.s.s.DefaultSlotService] [hz.main.generic-operation.thread-46] - received slot release request, jobID: 815141672055734273, slot: SlotProfile{worker=[localhost]:5801, slotID=7, ownerJobID=815141672055734273, assigned=true, resourceProfile=ResourceProfile{cpu=CPU{core=0}, heapMemory=Memory{bytes=0}}, sequence='af5ae013-144f-4faa-937b-8981475315eb'}
2024-02-28 08:45:23,103 INFO  [a.s.e.s.s.s.DefaultSlotService] [hz.main.generic-operation.thread-47] - received slot release request, jobID: 815141672055734273, slot: SlotProfile{worker=[localhost]:5801, slotID=8, ownerJobID=815141672055734273, assigned=true, resourceProfile=ResourceProfile{cpu=CPU{core=0}, heapMemory=Memory{bytes=0}}, sequence='af5ae013-144f-4faa-937b-8981475315eb'}
2024-02-28 08:45:23,104 INFO  [o.a.s.e.s.d.p.SubPlan         ] [seatunnel-coordinator-service-2] - Job SeaTunnel_Job (815141672055734273), Pipeline: [(1/1)] state process is stop
2024-02-28 08:45:23,104 INFO  [o.a.s.e.s.d.p.PhysicalPlan    ] [seatunnel-coordinator-service-3] - Job SeaTunnel_Job (815141672055734273), Pipeline: [(1/1)] future complete with state FAILED
2024-02-28 08:45:23,105 INFO  [o.a.s.e.s.d.p.PhysicalPlan    ] [seatunnel-coordinator-service-3] - cancel job Job SeaTunnel_Job (815141672055734273) because makeJobEndWhenPipelineEnded is true
2024-02-28 08:45:23,106 INFO  [o.a.s.e.s.d.p.PhysicalPlan    ] [seatunnel-coordinator-service-3] - Job SeaTunnel_Job (815141672055734273) turned from state RUNNING to FAILING.
2024-02-28 08:45:23,108 INFO  [o.a.s.e.s.d.p.PhysicalPlan    ] [seatunnel-coordinator-service-3] - Job SeaTunnel_Job (815141672055734273) turned from state FAILING to FAILED.
2024-02-28 08:45:23,109 INFO  [o.a.s.e.s.d.p.PhysicalPlan    ] [seatunnel-coordinator-service-3] - Job SeaTunnel_Job (815141672055734273) state process is stop
2024-02-28 08:45:23,137 INFO  [o.a.s.e.c.j.ClientJobProxy    ] [main] - Job (815141672055734273) end with state FAILED
2024-02-28 08:45:23,138 INFO  [c.h.c.LifecycleService        ] [main] - hz.client_1 [seatunnel-347553] [5.1] HazelcastClient 5.1 (20220228 - 21f20e7) is SHUTTING_DOWN
2024-02-28 08:45:23,142 INFO  [c.h.i.s.t.TcpServerConnection ] [hz.main.IO.thread-in-1] - [localhost]:5801 [seatunnel-347553] [5.1] Connection[id=1, /127.0.0.1:5801->/127.0.0.1:35765, qualifier=null, endpoint=[127.0.0.1]:35765, remoteUuid=b59847b8-8f89-4ef1-bf02-3ebe18c4f582, alive=false, connectionType=JVM, planeIndex=-1] closed. Reason: Connection closed by the other side
2024-02-28 08:45:23,143 INFO  [.c.i.c.ClientConnectionManager] [main] - hz.client_1 [seatunnel-347553] [5.1] Removed connection to endpoint: [localhost]:5801:6e1d5e93-9d1b-4167-a038-b76888e088fe, connection: ClientConnection{alive=false, connectionId=1, channel=NioChannel{/127.0.0.1:35765->localhost/127.0.0.1:5801}, remoteAddress=[localhost]:5801, lastReadTime=2024-02-28 08:45:23.129, lastWriteTime=2024-02-28 08:45:19.110, closedTime=2024-02-28 08:45:23.141, connected server version=5.1}
2024-02-28 08:45:23,144 INFO  [c.h.c.LifecycleService        ] [main] - hz.client_1 [seatunnel-347553] [5.1] HazelcastClient 5.1 (20220228 - 21f20e7) is CLIENT_DISCONNECTED
2024-02-28 08:45:23,147 INFO  [c.h.c.i.ClientEndpointManager ] [hz.main.event-1] - [localhost]:5801 [seatunnel-347553] [5.1] Destroying ClientEndpoint{connection=Connection[id=1, /127.0.0.1:5801->/127.0.0.1:35765, qualifier=null, endpoint=[127.0.0.1]:35765, remoteUuid=b59847b8-8f89-4ef1-bf02-3ebe18c4f582, alive=false, connectionType=JVM, planeIndex=-1], clientUuid=b59847b8-8f89-4ef1-bf02-3ebe18c4f582, clientName=hz.client_1, authenticated=true, clientVersion=5.1, creationTime=1709109719041, latest clientAttributes=lastStatisticsCollectionTime=1709109919108,enterprise=false,clientType=JVM,clientVersion=5.1,clusterConnectionTimestamp=1709109719030,clientAddress=127.0.0.1,clientName=hz.client_1,credentials.principal=null,os.committedVirtualMemorySize=2709667840,os.freePhysicalMemorySize=320233472,os.freeSwapSpaceSize=0,os.maxFileDescriptorCount=1048576,os.openFileDescriptorCount=65,os.processCpuTime=12630000000,os.systemLoadAverage=0.62,os.totalPhysicalMemorySize=8280195072,os.totalSwapSpaceSize=0,runtime.availableProcessors=1,runtime.freeMemory=161719896,runtime.maxMemory=518979584,runtime.totalMemory=260177920,runtime.uptime=203553,runtime.usedMemory=98460344, labels=[]}
2024-02-28 08:45:23,151 INFO  [c.h.c.LifecycleService        ] [main] - hz.client_1 [seatunnel-347553] [5.1] HazelcastClient 5.1 (20220228 - 21f20e7) is SHUTDOWN
2024-02-28 08:45:23,152 INFO  [s.c.s.s.c.ClientExecuteCommand] [main] - Closed SeaTunnel client......
2024-02-28 08:45:23,153 INFO  [c.h.c.LifecycleService        ] [main] - [localhost]:5801 [seatunnel-347553] [5.1] [localhost]:5801 is SHUTTING_DOWN
2024-02-28 08:45:23,158 INFO  [c.h.i.p.i.MigrationManager    ] [hz.main.cached.thread-6] - [localhost]:5801 [seatunnel-347553] [5.1] Shutdown request of Member [localhost]:5801 - 6e1d5e93-9d1b-4167-a038-b76888e088fe this is handled
2024-02-28 08:45:23,164 INFO  [c.h.i.i.Node                  ] [main] - [localhost]:5801 [seatunnel-347553] [5.1] Shutting down connection manager...
2024-02-28 08:45:23,172 INFO  [c.h.i.i.Node                  ] [main] - [localhost]:5801 [seatunnel-347553] [5.1] Shutting down node engine...
2024-02-28 08:45:23,211 INFO  [c.h.i.i.NodeExtension         ] [main] - [localhost]:5801 [seatunnel-347553] [5.1] Destroying node NodeExtension.
2024-02-28 08:45:23,212 INFO  [c.h.i.i.Node                  ] [main] - [localhost]:5801 [seatunnel-347553] [5.1] Hazelcast Shutdown is completed in 56 ms.
2024-02-28 08:45:23,212 INFO  [c.h.c.LifecycleService        ] [main] - [localhost]:5801 [seatunnel-347553] [5.1] [localhost]:5801 is SHUTDOWN
2024-02-28 08:45:23,214 INFO  [s.c.s.s.c.ClientExecuteCommand] [main] - Closed HazelcastInstance ......
2024-02-28 08:45:23,216 INFO  [o.a.s.s.c.z.h.HikariDataSource] [BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=30000}] - HikariPool-4 - Start completed.
2024-02-28 08:45:23,218 INFO  [s.c.s.s.c.ClientExecuteCommand] [main] - Closed metrics executor service ......
2024-02-28 08:45:23,219 ERROR [o.a.s.c.s.SeaTunnel           ] [main] - 

===============================================================================


2024-02-28 08:45:23,220 ERROR [o.a.s.c.s.SeaTunnel           ] [main] - Fatal Error, 

2024-02-28 08:45:23,221 ERROR [o.a.s.c.s.SeaTunnel           ] [main] - Please submit bug report in https://github.com/apache/seatunnel/issues

2024-02-28 08:45:23,222 ERROR [o.a.s.c.s.SeaTunnel           ] [main] - Reason:SeaTunnel job executed failed 

2024-02-28 08:45:23,223 ERROR [o.a.s.c.s.SeaTunnel           ] [main] - Exception StackTrace:org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed
        at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:199)
        at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
        at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34)
Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: java.lang.NullPointerException
        at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializationConverters.convert(SeaTunnelRowDebeziumDeserializationConverters.java:88)
        at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializeSchema.extractBeforeRow(SeaTunnelRowDebeziumDeserializeSchema.java:222)
        at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializeSchema.deserializeDataChangeRecord(SeaTunnelRowDebeziumDeserializeSchema.java:189)
        at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializeSchema.deserialize(SeaTunnelRowDebeziumDeserializeSchema.java:111)
        at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.emitElement(IncrementalSourceRecordEmitter.java:157)
        at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.processElement(IncrementalSourceRecordEmitter.java:132)
        at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.emitRecord(IncrementalSourceRecordEmitter.java:89)
        at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.emitRecord(IncrementalSourceRecordEmitter.java:55)
        at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:108)
        at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:110)
        at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:112)
        at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:110)
        at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.collect(SourceFlowLifeCycle.java:150)
        at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.collect(SourceSeaTunnelTask.java:116)
        at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:168)
        at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.call(SourceSeaTunnelTask.java:121)
        at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:643)
        at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:944)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)

        at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:191)
        ... 2 more
 
2024-02-28 08:45:23,237 ERROR [o.a.s.c.s.SeaTunnel           ] [main] - 
===============================================================================



Exception in thread "main" org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed
        at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:199)
        at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
        at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34)
Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: java.lang.NullPointerException
        at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializationConverters.convert(SeaTunnelRowDebeziumDeserializationConverters.java:88)
        at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializeSchema.extractBeforeRow(SeaTunnelRowDebeziumDeserializeSchema.java:222)
        at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializeSchema.deserializeDataChangeRecord(SeaTunnelRowDebeziumDeserializeSchema.java:189)
        at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializeSchema.deserialize(SeaTunnelRowDebeziumDeserializeSchema.java:111)
        at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.emitElement(IncrementalSourceRecordEmitter.java:157)
        at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.processElement(IncrementalSourceRecordEmitter.java:132)
        at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.emitRecord(IncrementalSourceRecordEmitter.java:89)
        at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.emitRecord(IncrementalSourceRecordEmitter.java:55)
        at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:108)
        at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:110)
        at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:112)
        at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:110)
        at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.collect(SourceFlowLifeCycle.java:150)
        at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.collect(SourceSeaTunnelTask.java:116)
        at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:168)
        at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.call(SourceSeaTunnelTask.java:121)
        at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:643)
        at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:944)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)

        at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:191)
        ... 2 more
2024-02-28 08:45:23,260 INFO  [.e.FieldNamedPreparedStatement] [BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=30000}] - PrepareStatement sql is:
INSERT INTO "test_db"."public"."t_user_2" ("id", "name", "age", "update_time") VALUES (?, ?, ?, ?) ON CONFLICT ("id") DO UPDATE SET "id"=EXCLUDED."id", "name"=EXCLUDED."name", "age"=EXCLUDED."age", "update_time"=EXCLUDED."update_time"

2024-02-28 08:45:23,265 INFO  [.e.FieldNamedPreparedStatement] [BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=30000}] - PrepareStatement sql is:
DELETE FROM "test_db"."public"."t_user_2" WHERE "id" = ?

2024-02-28 08:45:23,267 INFO  [.JdbcMultiTableResourceManager] [BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=30000}] - start close connection poolHikariPool-4
2024-02-28 08:45:23,268 INFO  [o.a.s.s.c.z.h.HikariDataSource] [BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=30000}] - HikariPool-4 - Shutdown initiated...
2024-02-28 08:45:23,272 WARN  [o.a.s.s.c.z.h.HikariDataSource] [BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=30000}] - HikariPool-4 - Interrupted during closing
java.lang.InterruptedException: null
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2067) ~[?:1.8.0_342]
        at java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1475) ~[?:1.8.0_342]
        at org.apache.seatunnel.shade.com.zaxxer.hikari.pool.HikariPool.shutdown(HikariPool.java:255) ~[?:?]
        at org.apache.seatunnel.shade.com.zaxxer.hikari.HikariDataSource.close(HikariDataSource.java:351) ~[?:?]
        at org.apache.seatunnel.connectors.seatunnel.jdbc.sink.ConnectionPoolManager.close(ConnectionPoolManager.java:66) ~[?:?]
        at org.apache.seatunnel.connectors.seatunnel.jdbc.sink.JdbcMultiTableResourceManager.close(JdbcMultiTableResourceManager.java:42) ~[?:?]
        at org.apache.seatunnel.connectors.seatunnel.common.multitablesink.MultiTableSinkWriter.close(MultiTableSinkWriter.java:251) ~[seatunnel-transforms-v2.jar:2.3.4]
        at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.close(SinkFlowLifeCycle.java:153) ~[seatunnel-starter.jar:2.3.4]
        at org.apache.seatunnel.engine.server.task.SeaTunnelTask.lambda$close$5(SeaTunnelTask.java:330) ~[seatunnel-starter.jar:2.3.4]
        at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) [?:1.8.0_342]
        at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384) [?:1.8.0_342]
        at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) [?:1.8.0_342]
        at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:290) [?:1.8.0_342]
        at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731) [?:1.8.0_342]
        at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) [?:1.8.0_342]
        at java.util.concurrent.ForkJoinPool.helpComplete(ForkJoinPool.java:1870) [?:1.8.0_342]
        at java.util.concurrent.ForkJoinPool.externalHelpComplete(ForkJoinPool.java:2467) [?:1.8.0_342]
        at java.util.concurrent.ForkJoinTask.externalAwaitDone(ForkJoinTask.java:324) [?:1.8.0_342]
        at java.util.concurrent.ForkJoinTask.doInvoke(ForkJoinTask.java:405) [?:1.8.0_342]
        at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:734) [?:1.8.0_342]
        at java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:159) [?:1.8.0_342]
        at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:173) [?:1.8.0_342]
        at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233) [?:1.8.0_342]
        at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485) [?:1.8.0_342]
        at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:650) [?:1.8.0_342]
        at org.apache.seatunnel.engine.server.task.SeaTunnelTask.close(SeaTunnelTask.java:327) [seatunnel-starter.jar:2.3.4]
        at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:660) [seatunnel-starter.jar:2.3.4]
        at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:944) [seatunnel-starter.jar:2.3.4]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_342]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_342]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_342]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_342]
        at java.lang.Thread.run(Thread.java:750) [?:1.8.0_342]
2024-02-28 08:45:23,899 INFO  [r.IncrementalSourceSplitReader] [Source Data Fetcher for BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=30000}] - Close current fetcher org.apache.seatunnel.connectors.cdc.base.source.reader.external.IncrementalSourceStreamFetcher
2024-02-28 08:45:23,900 INFO  [i.d.j.JdbcConnection          ] [pool-64-thread-1] - Connection gracefully closed
2024-02-28 08:45:23,902 INFO  [i.d.j.JdbcConnection          ] [pool-65-thread-1] - Connection gracefully closed
2024-02-28 08:45:23,903 INFO  [i.d.j.JdbcConnection          ] [pool-66-thread-1] - Connection gracefully closed
2024-02-28 08:45:24,304 INFO  [o.a.s.c.s.c.s.r.f.SplitFetcher] [Source Data Fetcher for BlockingWorker-TaskGroupLocation{jobId=815141672055734273, pipelineId=1, taskGroupId=30000}] - Split fetcher 0 exited.
2024-02-28 08:45:24,316 INFO  [s.c.s.s.c.ClientExecuteCommand] [Thread-105] - run shutdown hook because get close signal

Zeta or Flink or Spark Version

zeta

Java or Scala Version

No response

Screenshots

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@2606090723 2606090723 added the bug label Feb 28, 2024
@2606090723 2606090723 changed the title [Bug2.3.4] [postgresqlCDC]对源表做update操作时 ,同步报错停止 [Bug2.3.4] [postgresqlCDC]对源表做update操作时 ,同步任务报错停止When updating the source table, the synchronization task stops with an error Feb 29, 2024
Copy link

This issue has been automatically marked as stale because it has not had recent activity for 30 days. It will be closed in next 7 days if no further activity occurs.

@github-actions github-actions bot added the stale label Mar 31, 2024
@xuchangqun
Copy link

+1

@chess3cake
Copy link

chess3cake commented Jun 13, 2024

对源表delete时也有该问题,pgsql版本15, driver版本42.7.3

@hailin0
Copy link
Member

hailin0 commented Jun 13, 2024

Please check your database settings
https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/source/Postgre-CDC.md#using-dependency

Here are the steps to enable CDC (Change Data Capture) in PostgreSQL:

Ensure the wal_level is set to logical: Modify the postgresql.conf configuration file by adding "wal_level = logical", restart the PostgreSQL server for the changes to take effect. Alternatively, you can use SQL commands to modify the configuration directly:

ALTER SYSTEM SET wal_level TO 'logical';
SELECT pg_reload_conf();

-- Change the REPLICA policy of the specified table to FULL
ALTER TABLE your_table_name REPLICA IDENTITY FULL;

@chess3cake
Copy link

Please check your database settings https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/source/Postgre-CDC.md#using-dependency

Here are the steps to enable CDC (Change Data Capture) in PostgreSQL:

Ensure the wal_level is set to logical: Modify the postgresql.conf configuration file by adding "wal_level = logical", restart the PostgreSQL server for the changes to take effect. Alternatively, you can use SQL commands to modify the configuration directly:

ALTER SYSTEM SET wal_level TO 'logical';
SELECT pg_reload_conf();

-- Change the REPLICA policy of the specified table to FULL
ALTER TABLE your_table_name REPLICA IDENTITY FULL;

I guess the settings are correct.
lQLPJw2DkNURrWHMts0BvLBCKuSmcRfbXAZVynWC-TEA_444_182

@hailin0
Copy link
Member

hailin0 commented Jun 13, 2024

check

ALTER TABLE your_table_name REPLICA IDENTITY FULL;

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants