Skip to content

Pipe: Fix deadlock when PipeTaskAgent.handlePipeMetaChanges and PipeTaskDataNodeAgent.stopAllPipesWithCriticalException are invoked concurrently & Fix concurrent issues caused by addFailureEventToRetryQueue & transferQueuedEventsIfNecessary#11893

Merged
SteveYurongSu merged 5 commits intomasterfrom
fix-pipe-deadlock
Jan 15, 2024

Conversation

@SteveYurongSu
Copy link
Copy Markdown
Member

@SteveYurongSu SteveYurongSu commented Jan 14, 2024

Summary

  1. Fix deadlock when PipeTaskAgent.handlePipeMetaChanges and PipeTaskDataNodeAgent.stopAllPipesWithCriticalException are invoked concurrently
Found one Java-level deadlock:
=============================
"pool-41-IoTDB-DataNodeInternalRPC-Processor-2":
  waiting to lock monitor 0x00000299461ba110 (object 0x000000054e91ea58, a org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtask),
  which is held by "pool-25-IoTDB-Pipe-SubTask-Callback-Executor-Pool-1"

"pool-25-IoTDB-Pipe-SubTask-Callback-Executor-Pool-1":
  waiting to lock monitor 0x000002994731e980 (object 0x0000000404acc078, a org.apache.iotdb.db.pipe.agent.task.PipeTaskDataNodeAgent),
  which is held by "pool-41-IoTDB-DataNodeInternalRPC-Processor-2"

Java stack information for the threads listed above: 
=================================================== 
"pool-41-IoTDB-DataNodeInternalRPC-Processor-2":
        at org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtask.submitSelf(PipeConnectorSubtask.java:297)
        - waiting to lock <0x000000054e91ea58> (a org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtask)
        at org.apache.iotdb.commons.pipe.execution.executor.PipeSubtaskExecutor.start(PipeSubtaskExecutor.java:90)
        - locked <0x000000054e91e840> (a org.apache.iotdb.db.pipe.execution.executor.dataregion.PipeDataRegionConnectorSubtaskExecutor)
        at org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtaskLifeCycle.start(PipeConnectorSubtaskLifeCycle.java:121)
        - locked <0x000000054e02eb90> (a org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtaskLifeCycle)
        at org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtaskManager.start(PipeConnectorSubtaskManager.java:155)
        - locked <0x000000054e02ea40> (a org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtaskManager)
        at org.apache.iotdb.db.pipe.task.stage.PipeTaskConnectorStage.startSubtask(PipeTaskConnectorStage.java:66)
        at org.apache.iotdb.commons.pipe.task.stage.PipeTaskStage.start(PipeTaskStage.java:86)
        - locked <0x000000054ee138f8> (a org.apache.iotdb.db.pipe.task.stage.PipeTaskConnectorStage)
        at org.apache.iotdb.db.pipe.task.PipeDataNodeTask.start(PipeDataNodeTask.java:67)
        at org.apache.iotdb.commons.pipe.agent.task.PipeTaskAgent.startPipe(PipeTaskAgent.java:513)
        at org.apache.iotdb.commons.pipe.agent.task.PipeTaskAgent.executeSinglePipeRuntimeMetaChanges(PipeTaskAgent.java:248)
        at org.apache.iotdb.commons.pipe.agent.task.PipeTaskAgent.executeSinglePipeMetaChanges(PipeTaskAgent.java:168)
        at org.apache.iotdb.commons.pipe.agent.task.PipeTaskAgent.handlePipeMetaChangesInternal(PipeTaskAgent.java:325)
        at org.apache.iotdb.commons.pipe.agent.task.PipeTaskAgent.handlePipeMetaChanges(PipeTaskAgent.java:306)
        - locked <0x0000000404acc078> (a org.apache.iotdb.db.pipe.agent.task.PipeTaskDataNodeAgent)
        at org.apache.iotdb.db.protocol.thrift.impl.DataNodeInternalRPCServiceImpl.pushPipeMeta(DataNodeInternalRPCServiceImpl.java:961)
        at org.apache.iotdb.mpp.rpc.thrift.IDataNodeRPCService$Processor$pushPipeMeta.getResult(IDataNodeRPCService.java:5746)
        at org.apache.iotdb.mpp.rpc.thrift.IDataNodeRPCService$Processor$pushPipeMeta.getResult(IDataNodeRPCService.java:5726)
        at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:38)
        at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:38)
        at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:248)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@17.0.8/ThreadPoolExecutor.java:1136)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@17.0.8/ThreadPoolExecutor.java:635)
        at java.lang.Thread.run(java.base@17.0.8/Thread.java:833)
"pool-25-IoTDB-Pipe-SubTask-Callback-Executor-Pool-1":
        at org.apache.iotdb.db.pipe.agent.task.PipeTaskDataNodeAgent.stopAllPipesWithCriticalException(PipeTaskDataNodeAgent.java:79)
        - waiting to lock <0x0000000404acc078> (a org.apache.iotdb.db.pipe.agent.task.PipeTaskDataNodeAgent)
        at org.apache.iotdb.db.pipe.agent.runtime.PipeRuntimeAgent.report(PipeRuntimeAgent.java:142)
        at org.apache.iotdb.db.pipe.event.EnrichedEvent.reportException(EnrichedEvent.java:220)
        at org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtask.onFailure(PipeConnectorSubtask.java:253)
        - locked <0x000000054e91ea58> (a org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtask)
        at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1119)
        at org.apache.iotdb.commons.concurrent.WrappedRunnable$1.runMayThrow(WrappedRunnable.java:45)
        at org.apache.iotdb.commons.concurrent.WrappedRunnable.run(WrappedRunnable.java:30)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@17.0.8/ThreadPoolExecutor.java:1136)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@17.0.8/ThreadPoolExecutor.java:635)
        at java.lang.Thread.run(java.base@17.0.8/Thread.java:833)

Found 1 deadlock.
  1. Fix concurrent issues caused by addFailureEventToRetryQueue & transferQueuedEventsIfNecessary
2024-01-15 15:13:53,529 [pool-50-IoTDB-Pipe-DataRegion-Connector-Executor-Pool-4] INFO  o.a.i.d.p.c.p.t.s.IoTDBThriftSyncConnector:471 - Successfully transferred file /data/iotdb/apache-iotdb-1.3.1-SNAPSHOT-all-bin/data/datanode/data/pipe/tsfile/sequence-root.test.g_0-2-2721-1705043651661-61-1-0.tsfile. 
...
2024-01-15 15:14:01,200 [pool-2-IoTDB-Pipe-Runtime-Periodical-Job-Executor-1] INFO  o.a.i.d.p.r.t.PipeTsFileResource:126 - PipeTsFileResource: Closed tsfile /data/iotdb/apache-iotdb-1.3.1-SNAPSHOT-all-bin/data/datanode/data/pipe/tsfile/sequence-root.test.g_0-2-2721-1705043651661-61-1-0.tsfile and cleaned up. 
...
2024-01-15 15:14:05,702 [pool-50-IoTDB-Pipe-DataRegion-Connector-Executor-Pool-4] ERROR o.a.i.c.c.t.WrappedThreadPoolExecutor:129 - Exception in thread pool org.apache.iotdb.threadpool:type=Pipe-DataRegion-Connector-Executor-Pool 
org.apache.iotdb.pipe.api.exception.PipeConnectionException: Failed to transfer tsfile insertion event PipeTsFileInsertionEvent{resource=file is /data/iotdb/apache-iotdb-1.3.1-SNAPSHOT-all-bin/data/datanode/data/sequence/root.test.g_0/2/2721/1705043651661-61-1-0.tsfile, status: NORMAL, tsFile=/data/iotdb/apache-iotdb-1.3.1-SNAPSHOT-all-bin/data/datanode/data/pipe/tsfile/sequence-root.test.g_0-2-2721-1705043651661-61-1-0.tsfile, isClosed=true}, because Seal file /data/iotdb/apache-iotdb-1.3.1-SNAPSHOT-all-bin/data/datanode/data/pipe/tsfile/sequence-root.test.g_0-2-2721-1705043651661-61-1-0.tsfile error, result status TSStatus(code:1804, message:Failed to seal file sequence-root.test.g_0-2-2721-1705043651661-61-1-0.tsfile, because the length of file is not correct. The original file has length 0, but receiver file has length 117603772.)..
	at org.apache.iotdb.db.pipe.connector.protocol.thrift.sync.IoTDBThriftSyncConnector.transfer(IoTDBThriftSyncConnector.java:272)
	at org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBThriftAsyncConnector.transferQueuedEventsIfNecessary(IoTDBThriftAsyncConnector.java:351)
	at org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBThriftAsyncConnector.transfer(IoTDBThriftAsyncConnector.java:250)
	at org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtask.executeOnce(PipeConnectorSubtask.java:140)
	at org.apache.iotdb.commons.pipe.task.subtask.PipeSubtask.call(PipeSubtask.java:75)
	at org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtask.call(PipeConnectorSubtask.java:108)
	at org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtask.call(PipeConnectorSubtask.java:51)
	at com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:131)
	at com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:75)
	at com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:82)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.iotdb.pipe.api.exception.PipeException: Seal file /data/iotdb/apache-iotdb-1.3.1-SNAPSHOT-all-bin/data/datanode/data/pipe/tsfile/sequence-root.test.g_0-2-2721-1705043651661-61-1-0.tsfile error, result status TSStatus(code:1804, message:Failed to seal file sequence-root.test.g_0-2-2721-1705043651661-61-1-0.tsfile, because the length of file is not correct. The original file has length 0, but receiver file has length 117603772.).
	at org.apache.iotdb.db.pipe.connector.protocol.thrift.sync.IoTDBThriftSyncConnector.doTransfer(IoTDBThriftSyncConnector.java:469)
	at org.apache.iotdb.db.pipe.connector.protocol.thrift.sync.IoTDBThriftSyncConnector.transfer(IoTDBThriftSyncConnector.java:269)
	... 12 common frames omitted

…askDataNodeAgent.stopAllPipesWithCriticalException are invoked concurrently

Found one Java-level deadlock:
=============================
"pool-41-IoTDB-DataNodeInternalRPC-Processor-2":
  waiting to lock monitor 0x00000299461ba110 (object 0x000000054e91ea58, a org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtask),
  which is held by "pool-25-IoTDB-Pipe-SubTask-Callback-Executor-Pool-1"

"pool-25-IoTDB-Pipe-SubTask-Callback-Executor-Pool-1":
  waiting to lock monitor 0x000002994731e980 (object 0x0000000404acc078, a org.apache.iotdb.db.pipe.agent.task.PipeTaskDataNodeAgent),
  which is held by "pool-41-IoTDB-DataNodeInternalRPC-Processor-2"

Java stack information for the threads listed above:
===================================================
"pool-41-IoTDB-DataNodeInternalRPC-Processor-2":
        at org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtask.submitSelf(PipeConnectorSubtask.java:297)
        - waiting to lock <0x000000054e91ea58> (a org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtask)
        at org.apache.iotdb.commons.pipe.execution.executor.PipeSubtaskExecutor.start(PipeSubtaskExecutor.java:90)
        - locked <0x000000054e91e840> (a org.apache.iotdb.db.pipe.execution.executor.dataregion.PipeDataRegionConnectorSubtaskExecutor)
        at org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtaskLifeCycle.start(PipeConnectorSubtaskLifeCycle.java:121)
        - locked <0x000000054e02eb90> (a org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtaskLifeCycle)
        at org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtaskManager.start(PipeConnectorSubtaskManager.java:155)
        - locked <0x000000054e02ea40> (a org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtaskManager)
        at org.apache.iotdb.db.pipe.task.stage.PipeTaskConnectorStage.startSubtask(PipeTaskConnectorStage.java:66)
        at org.apache.iotdb.commons.pipe.task.stage.PipeTaskStage.start(PipeTaskStage.java:86)
        - locked <0x000000054ee138f8> (a org.apache.iotdb.db.pipe.task.stage.PipeTaskConnectorStage)
        at org.apache.iotdb.db.pipe.task.PipeDataNodeTask.start(PipeDataNodeTask.java:67)
        at org.apache.iotdb.commons.pipe.agent.task.PipeTaskAgent.startPipe(PipeTaskAgent.java:513)
        at org.apache.iotdb.commons.pipe.agent.task.PipeTaskAgent.executeSinglePipeRuntimeMetaChanges(PipeTaskAgent.java:248)
        at org.apache.iotdb.commons.pipe.agent.task.PipeTaskAgent.executeSinglePipeMetaChanges(PipeTaskAgent.java:168)
        at org.apache.iotdb.commons.pipe.agent.task.PipeTaskAgent.handlePipeMetaChangesInternal(PipeTaskAgent.java:325)
        at org.apache.iotdb.commons.pipe.agent.task.PipeTaskAgent.handlePipeMetaChanges(PipeTaskAgent.java:306)
        - locked <0x0000000404acc078> (a org.apache.iotdb.db.pipe.agent.task.PipeTaskDataNodeAgent)
        at org.apache.iotdb.db.protocol.thrift.impl.DataNodeInternalRPCServiceImpl.pushPipeMeta(DataNodeInternalRPCServiceImpl.java:961)
        at org.apache.iotdb.mpp.rpc.thrift.IDataNodeRPCService$Processor$pushPipeMeta.getResult(IDataNodeRPCService.java:5746)
        at org.apache.iotdb.mpp.rpc.thrift.IDataNodeRPCService$Processor$pushPipeMeta.getResult(IDataNodeRPCService.java:5726)
        at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:38)
        at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:38)
        at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:248)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@17.0.8/ThreadPoolExecutor.java:1136)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@17.0.8/ThreadPoolExecutor.java:635)
        at java.lang.Thread.run(java.base@17.0.8/Thread.java:833)
"pool-25-IoTDB-Pipe-SubTask-Callback-Executor-Pool-1":
        at org.apache.iotdb.db.pipe.agent.task.PipeTaskDataNodeAgent.stopAllPipesWithCriticalException(PipeTaskDataNodeAgent.java:79)
        - waiting to lock <0x0000000404acc078> (a org.apache.iotdb.db.pipe.agent.task.PipeTaskDataNodeAgent)
        at org.apache.iotdb.db.pipe.agent.runtime.PipeRuntimeAgent.report(PipeRuntimeAgent.java:142)
        at org.apache.iotdb.db.pipe.event.EnrichedEvent.reportException(EnrichedEvent.java:220)
        at org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtask.onFailure(PipeConnectorSubtask.java:253)
        - locked <0x000000054e91ea58> (a org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtask)
        at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1119)
        at org.apache.iotdb.commons.concurrent.WrappedRunnable$1.runMayThrow(WrappedRunnable.java:45)
        at org.apache.iotdb.commons.concurrent.WrappedRunnable.run(WrappedRunnable.java:30)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@17.0.8/ThreadPoolExecutor.java:1136)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@17.0.8/ThreadPoolExecutor.java:635)
        at java.lang.Thread.run(java.base@17.0.8/Thread.java:833)

Found 1 deadlock.
@codecov-commenter
Copy link
Copy Markdown

codecov-commenter commented Jan 14, 2024

Codecov Report

Attention: 29 lines in your changes are missing coverage. Please review.

Comparison is base (5d34371) 49.00% compared to head (bf449f4) 48.98%.
Report is 1 commits behind head on master.

Files Patch % Lines
...otdb/db/pipe/agent/task/PipeTaskDataNodeAgent.java 0.00% 16 Missing ⚠️
...e/iotdb/commons/pipe/agent/task/PipeTaskAgent.java 0.00% 6 Missing ⚠️
...e/iotdb/commons/pipe/task/subtask/PipeSubtask.java 50.00% 3 Missing ⚠️
...nager/pipe/coordinator/runtime/PipeMetaSyncer.java 0.00% 2 Missing ⚠️
.../iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java 0.00% 1 Missing ⚠️
...e/iotdb/commons/pipe/task/meta/PipeMetaKeeper.java 0.00% 1 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #11893      +/-   ##
============================================
- Coverage     49.00%   48.98%   -0.02%     
- Complexity    25722    25724       +2     
============================================
  Files          2910     2910              
  Lines        182063   182090      +27     
  Branches      21827    21830       +3     
============================================
- Hits          89212    89205       -7     
- Misses        92851    92885      +34     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@SteveYurongSu SteveYurongSu self-assigned this Jan 15, 2024
@SteveYurongSu SteveYurongSu changed the title Pipe: Fix deadlock when PipeTaskAgent.handlePipeMetaChanges and PipeTaskDataNodeAgent.stopAllPipesWithCriticalException are invoked concurrently Pipe: Fix deadlock when PipeTaskAgent.handlePipeMetaChanges and PipeTaskDataNodeAgent.stopAllPipesWithCriticalException are invoked concurrently & Fix concurrent issues caused by addFailureEventToRetryQueue & transferQueuedEventsIfNecessary Jan 15, 2024
@sonarqubecloud
Copy link
Copy Markdown

Quality Gate Failed Quality Gate failed

Failed conditions

0.0% Coverage on New Code (required ≥ 80%)

See analysis details on SonarCloud

@SteveYurongSu SteveYurongSu merged commit bcfeafe into master Jan 15, 2024
@SteveYurongSu SteveYurongSu deleted the fix-pipe-deadlock branch January 15, 2024 11:29
SzyWilliam pushed a commit to SzyWilliam/iotdb that referenced this pull request Nov 26, 2024
…askDataNodeAgent.stopAllPipesWithCriticalException are invoked concurrently & Fix concurrent issues caused by addFailureEventToRetryQueue & transferQueuedEventsIfNecessary (apache#11893)

1. Fix deadlock when PipeTaskAgent.handlePipeMetaChanges and PipeTaskDataNodeAgent.stopAllPipesWithCriticalException are invoked concurrently

```
Found one Java-level deadlock:
=============================
"pool-41-IoTDB-DataNodeInternalRPC-Processor-2":
  waiting to lock monitor 0x00000299461ba110 (object 0x000000054e91ea58, a org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtask),
  which is held by "pool-25-IoTDB-Pipe-SubTask-Callback-Executor-Pool-1"

"pool-25-IoTDB-Pipe-SubTask-Callback-Executor-Pool-1":
  waiting to lock monitor 0x000002994731e980 (object 0x0000000404acc078, a org.apache.iotdb.db.pipe.agent.task.PipeTaskDataNodeAgent),
  which is held by "pool-41-IoTDB-DataNodeInternalRPC-Processor-2"

Java stack information for the threads listed above: 
=================================================== 
"pool-41-IoTDB-DataNodeInternalRPC-Processor-2":
        at org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtask.submitSelf(PipeConnectorSubtask.java:297)
        - waiting to lock <0x000000054e91ea58> (a org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtask)
        at org.apache.iotdb.commons.pipe.execution.executor.PipeSubtaskExecutor.start(PipeSubtaskExecutor.java:90)
        - locked <0x000000054e91e840> (a org.apache.iotdb.db.pipe.execution.executor.dataregion.PipeDataRegionConnectorSubtaskExecutor)
        at org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtaskLifeCycle.start(PipeConnectorSubtaskLifeCycle.java:121)
        - locked <0x000000054e02eb90> (a org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtaskLifeCycle)
        at org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtaskManager.start(PipeConnectorSubtaskManager.java:155)
        - locked <0x000000054e02ea40> (a org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtaskManager)
        at org.apache.iotdb.db.pipe.task.stage.PipeTaskConnectorStage.startSubtask(PipeTaskConnectorStage.java:66)
        at org.apache.iotdb.commons.pipe.task.stage.PipeTaskStage.start(PipeTaskStage.java:86)
        - locked <0x000000054ee138f8> (a org.apache.iotdb.db.pipe.task.stage.PipeTaskConnectorStage)
        at org.apache.iotdb.db.pipe.task.PipeDataNodeTask.start(PipeDataNodeTask.java:67)
        at org.apache.iotdb.commons.pipe.agent.task.PipeTaskAgent.startPipe(PipeTaskAgent.java:513)
        at org.apache.iotdb.commons.pipe.agent.task.PipeTaskAgent.executeSinglePipeRuntimeMetaChanges(PipeTaskAgent.java:248)
        at org.apache.iotdb.commons.pipe.agent.task.PipeTaskAgent.executeSinglePipeMetaChanges(PipeTaskAgent.java:168)
        at org.apache.iotdb.commons.pipe.agent.task.PipeTaskAgent.handlePipeMetaChangesInternal(PipeTaskAgent.java:325)
        at org.apache.iotdb.commons.pipe.agent.task.PipeTaskAgent.handlePipeMetaChanges(PipeTaskAgent.java:306)
        - locked <0x0000000404acc078> (a org.apache.iotdb.db.pipe.agent.task.PipeTaskDataNodeAgent)
        at org.apache.iotdb.db.protocol.thrift.impl.DataNodeInternalRPCServiceImpl.pushPipeMeta(DataNodeInternalRPCServiceImpl.java:961)
        at org.apache.iotdb.mpp.rpc.thrift.IDataNodeRPCService$Processor$pushPipeMeta.getResult(IDataNodeRPCService.java:5746)
        at org.apache.iotdb.mpp.rpc.thrift.IDataNodeRPCService$Processor$pushPipeMeta.getResult(IDataNodeRPCService.java:5726)
        at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:38)
        at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:38)
        at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:248)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@17.0.8/ThreadPoolExecutor.java:1136)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@17.0.8/ThreadPoolExecutor.java:635)
        at java.lang.Thread.run(java.base@17.0.8/Thread.java:833)
"pool-25-IoTDB-Pipe-SubTask-Callback-Executor-Pool-1":
        at org.apache.iotdb.db.pipe.agent.task.PipeTaskDataNodeAgent.stopAllPipesWithCriticalException(PipeTaskDataNodeAgent.java:79)
        - waiting to lock <0x0000000404acc078> (a org.apache.iotdb.db.pipe.agent.task.PipeTaskDataNodeAgent)
        at org.apache.iotdb.db.pipe.agent.runtime.PipeRuntimeAgent.report(PipeRuntimeAgent.java:142)
        at org.apache.iotdb.db.pipe.event.EnrichedEvent.reportException(EnrichedEvent.java:220)
        at org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtask.onFailure(PipeConnectorSubtask.java:253)
        - locked <0x000000054e91ea58> (a org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtask)
        at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1119)
        at org.apache.iotdb.commons.concurrent.WrappedRunnable$1.runMayThrow(WrappedRunnable.java:45)
        at org.apache.iotdb.commons.concurrent.WrappedRunnable.run(WrappedRunnable.java:30)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@17.0.8/ThreadPoolExecutor.java:1136)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@17.0.8/ThreadPoolExecutor.java:635)
        at java.lang.Thread.run(java.base@17.0.8/Thread.java:833)

Found 1 deadlock.
```

2. Fix concurrent issues caused by addFailureEventToRetryQueue & transferQueuedEventsIfNecessary

```
2024-01-15 15:13:53,529 [pool-50-IoTDB-Pipe-DataRegion-Connector-Executor-Pool-4] INFO  o.a.i.d.p.c.p.t.s.IoTDBThriftSyncConnector:471 - Successfully transferred file /data/iotdb/apache-iotdb-1.3.1-SNAPSHOT-all-bin/data/datanode/data/pipe/tsfile/sequence-root.test.g_0-2-2721-1705043651661-61-1-0.tsfile. 
...
2024-01-15 15:14:01,200 [pool-2-IoTDB-Pipe-Runtime-Periodical-Job-Executor-1] INFO  o.a.i.d.p.r.t.PipeTsFileResource:126 - PipeTsFileResource: Closed tsfile /data/iotdb/apache-iotdb-1.3.1-SNAPSHOT-all-bin/data/datanode/data/pipe/tsfile/sequence-root.test.g_0-2-2721-1705043651661-61-1-0.tsfile and cleaned up. 
...
2024-01-15 15:14:05,702 [pool-50-IoTDB-Pipe-DataRegion-Connector-Executor-Pool-4] ERROR o.a.i.c.c.t.WrappedThreadPoolExecutor:129 - Exception in thread pool org.apache.iotdb.threadpool:type=Pipe-DataRegion-Connector-Executor-Pool 
org.apache.iotdb.pipe.api.exception.PipeConnectionException: Failed to transfer tsfile insertion event PipeTsFileInsertionEvent{resource=file is /data/iotdb/apache-iotdb-1.3.1-SNAPSHOT-all-bin/data/datanode/data/sequence/root.test.g_0/2/2721/1705043651661-61-1-0.tsfile, status: NORMAL, tsFile=/data/iotdb/apache-iotdb-1.3.1-SNAPSHOT-all-bin/data/datanode/data/pipe/tsfile/sequence-root.test.g_0-2-2721-1705043651661-61-1-0.tsfile, isClosed=true}, because Seal file /data/iotdb/apache-iotdb-1.3.1-SNAPSHOT-all-bin/data/datanode/data/pipe/tsfile/sequence-root.test.g_0-2-2721-1705043651661-61-1-0.tsfile error, result status TSStatus(code:1804, message:Failed to seal file sequence-root.test.g_0-2-2721-1705043651661-61-1-0.tsfile, because the length of file is not correct. The original file has length 0, but receiver file has length 117603772.)..
	at org.apache.iotdb.db.pipe.connector.protocol.thrift.sync.IoTDBThriftSyncConnector.transfer(IoTDBThriftSyncConnector.java:272)
	at org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBThriftAsyncConnector.transferQueuedEventsIfNecessary(IoTDBThriftAsyncConnector.java:351)
	at org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBThriftAsyncConnector.transfer(IoTDBThriftAsyncConnector.java:250)
	at org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtask.executeOnce(PipeConnectorSubtask.java:140)
	at org.apache.iotdb.commons.pipe.task.subtask.PipeSubtask.call(PipeSubtask.java:75)
	at org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtask.call(PipeConnectorSubtask.java:108)
	at org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtask.call(PipeConnectorSubtask.java:51)
	at com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:131)
	at com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:75)
	at com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:82)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.iotdb.pipe.api.exception.PipeException: Seal file /data/iotdb/apache-iotdb-1.3.1-SNAPSHOT-all-bin/data/datanode/data/pipe/tsfile/sequence-root.test.g_0-2-2721-1705043651661-61-1-0.tsfile error, result status TSStatus(code:1804, message:Failed to seal file sequence-root.test.g_0-2-2721-1705043651661-61-1-0.tsfile, because the length of file is not correct. The original file has length 0, but receiver file has length 117603772.).
	at org.apache.iotdb.db.pipe.connector.protocol.thrift.sync.IoTDBThriftSyncConnector.doTransfer(IoTDBThriftSyncConnector.java:469)
	at org.apache.iotdb.db.pipe.connector.protocol.thrift.sync.IoTDBThriftSyncConnector.transfer(IoTDBThriftSyncConnector.java:269)
	... 12 common frames omitted
```
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants