From 06eb82cb1408a045037be82873c97d235946a761 Mon Sep 17 00:00:00 2001 From: yang wei Date: Mon, 28 May 2018 14:12:21 +0800 Subject: [PATCH] ARTEMIS-1891 use io executor to send replicate packet After sending pages, the thread will hold the storage manager write lock and send synchronization finished packet(use the parent thread pool) to the backup node. At the same time, thread pool is full bcs they are waiting for the storage manager read lock to write the page or journal, leading to replication starting failure. Here we use io executor to send replicate packet to fix thread pool starvation problem. --- .../artemis/core/replication/ReplicationManager.java | 10 +++++----- .../core/server/impl/SharedNothingLiveActivation.java | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java index 36422720a07..be5963a3739 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java @@ -120,7 +120,7 @@ public static ADD_OPERATION_TYPE toOperation(boolean isUpdate) { private final Queue pendingTokens = new ConcurrentLinkedQueue<>(); - private final ExecutorFactory executorFactory; + private final ExecutorFactory ioExecutorFactory; private final Executor replicationStream; @@ -142,12 +142,12 @@ public static ADD_OPERATION_TYPE toOperation(boolean isUpdate) { public ReplicationManager(CoreRemotingConnection remotingConnection, final long timeout, final long initialReplicationSyncTimeout, - final ExecutorFactory executorFactory) { - this.executorFactory = executorFactory; + final ExecutorFactory ioExecutorFactory) { + this.ioExecutorFactory = ioExecutorFactory; this.initialReplicationSyncTimeout = initialReplicationSyncTimeout; this.replicatingChannel = remotingConnection.getChannel(CHANNEL_ID.REPLICATION.id, -1); this.remotingConnection = remotingConnection; - this.replicationStream = executorFactory.getExecutor(); + this.replicationStream = ioExecutorFactory.getExecutor(); this.timeout = timeout; } @@ -355,7 +355,7 @@ private OperationContext sendReplicatePacket(final Packet packet, boolean lineUp return null; } - final OperationContext repliToken = OperationContextImpl.getContext(executorFactory); + final OperationContext repliToken = OperationContextImpl.getContext(ioExecutorFactory); if (lineUp) { repliToken.replicationLineUp(); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java index 2e289b5acf9..c03fd198131 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java @@ -169,7 +169,7 @@ public void startReplication(CoreRemotingConnection rc, ReplicationFailureListener listener = new ReplicationFailureListener(); rc.addCloseListener(listener); rc.addFailureListener(listener); - replicationManager = new ReplicationManager(rc, clusterConnection.getCallTimeout(), replicatedPolicy.getInitialReplicationSyncTimeout(), activeMQServer.getExecutorFactory()); + replicationManager = new ReplicationManager(rc, clusterConnection.getCallTimeout(), replicatedPolicy.getInitialReplicationSyncTimeout(), activeMQServer.getIOExecutorFactory()); replicationManager.start(); Thread t = new Thread(new Runnable() { @Override