Skip to content

Commit

Permalink
ARTEMIS-2462 Allow store-forward queue to be deleted afte scaledown
Browse files Browse the repository at this point in the history
After a node is scaled down to a target node, the sf queue in the
target node is not deleted.

Normally this is fine because may be reused when the scaled down
node is back up.

However in cloud environment many drainer pods can be created and
then shutdown in order to drain the messages to a live node (pod).
Each drainer pod will have a different node-id. Over time the sf
queues in the target broker node grows and those sf queues are
no longer reused.

Although use can use management API/console to manually delete
them, it would be nice to have an option to automatically delete
those sf queue/address resources after scale down.

In this PR it added a boolean configuration parameter called
cleanup-sf-queue to scale down policy so that if the parameter
is "true" the broker will send a message to the
target broker signalling that the SF queue is no longer
needed and should be deleted.

If the parameter is not defined (default) or is "false"
the scale down won't remove the sf queue.
  • Loading branch information
howardgao committed Aug 28, 2019
1 parent 3a58387 commit 397cef6
Show file tree
Hide file tree
Showing 29 changed files with 399 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,9 @@ public static String getDefaultHapolicyBackupStrategy() {
// its possible that you only want a server to partake in scale down as a receiver, via a group. In this case set scale-down to false
private static boolean DEFAULT_SCALE_DOWN_ENABLED = true;

// will the target node delete the store-and-forward queue for the scaled down node.
private static boolean DEFAULT_SCALE_DOWN_CLEANUP_SF_QUEUE = false;

// How long to wait for a decision
private static int DEFAULT_GROUPING_HANDLER_TIMEOUT = 5000;

Expand Down Expand Up @@ -1531,4 +1534,8 @@ public static int getDefaultQuorumVoteWait() {
public static long getDefaultRetryReplicationWait() {
return DEFAULT_RETRY_REPLICATION_WAIT;
}

public static boolean isDefaultCleanupSfQueue() {
return DEFAULT_SCALE_DOWN_CLEANUP_SF_QUEUE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ public class PacketImpl implements Packet {

public static final byte SESS_BINDINGQUERY_RESP_V4 = -15;

public static final byte SCALEDOWN_ANNOUNCEMENT_V2 = -16;

// Static --------------------------------------------------------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,9 @@ public static HAPolicy getHAPolicy(HAPolicyConfiguration conf,
public static ScaleDownPolicy getScaleDownPolicy(ScaleDownConfiguration scaleDownConfiguration) {
if (scaleDownConfiguration != null) {
if (scaleDownConfiguration.getDiscoveryGroup() != null) {
return new ScaleDownPolicy(scaleDownConfiguration.getDiscoveryGroup(), scaleDownConfiguration.getGroupName(), scaleDownConfiguration.getClusterName(), scaleDownConfiguration.isEnabled());
return new ScaleDownPolicy(scaleDownConfiguration.getDiscoveryGroup(), scaleDownConfiguration.getGroupName(), scaleDownConfiguration.getClusterName(), scaleDownConfiguration.isEnabled(), scaleDownConfiguration.isCleanupSfQueue());
} else {
return new ScaleDownPolicy(scaleDownConfiguration.getConnectors(), scaleDownConfiguration.getGroupName(), scaleDownConfiguration.getClusterName(), scaleDownConfiguration.isEnabled());
return new ScaleDownPolicy(scaleDownConfiguration.getConnectors(), scaleDownConfiguration.getGroupName(), scaleDownConfiguration.getClusterName(), scaleDownConfiguration.isEnabled(), scaleDownConfiguration.isCleanupSfQueue());
}
}
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ public class ScaleDownConfiguration implements Serializable {

private boolean enabled = ActiveMQDefaultConfiguration.isDefaultScaleDownEnabled();

private boolean cleanupSfQueue = ActiveMQDefaultConfiguration.isDefaultCleanupSfQueue();

public List<String> getConnectors() {
return connectors;
}
Expand Down Expand Up @@ -83,4 +85,13 @@ public ScaleDownConfiguration setEnabled(boolean enabled) {
this.enabled = enabled;
return this;
}

public Boolean isCleanupSfQueue() {
return this.cleanupSfQueue;
}

public ScaleDownConfiguration setCleanupSfQueue(Boolean cleanupSfQueue) {
this.cleanupSfQueue = cleanupSfQueue;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1578,6 +1578,8 @@ private ScaleDownConfiguration parseScaleDownConfig(Element policyNode) {

Element scaleDownElement = (Element) scaleDownNode.item(0);

scaleDownConfiguration.setCleanupSfQueue(getBoolean(scaleDownElement, "cleanup-sf-queue", scaleDownConfiguration.isCleanupSfQueue()));

scaleDownConfiguration.setEnabled(getBoolean(scaleDownElement, "enabled", scaleDownConfiguration.isEnabled()));

NodeList discoveryGroupRef = scaleDownElement.getElementsByTagName("discovery-group-ref");
Expand Down Expand Up @@ -1791,8 +1793,6 @@ private void parseClusterConnectionConfiguration(final Element e, final Configur

int clusterNotificationAttempts = getInteger(e, "notification-attempts", ActiveMQDefaultConfiguration.getDefaultClusterNotificationAttempts(), Validators.GT_ZERO);

String scaleDownConnector = e.getAttribute("scale-down-connector");

String discoveryGroupName = null;

List<String> staticConnectorNames = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationStartSyncMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationSyncFileMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ScaleDownAnnounceMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ScaleDownAnnounceMessageV2;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionAcknowledgeMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionConsumerFlowCreditMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionRequestProducerCreditsMessage;
Expand Down Expand Up @@ -76,6 +77,7 @@
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.REPLICATION_RESPONSE;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.REPLICATION_RESPONSE_V2;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SCALEDOWN_ANNOUNCEMENT;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SCALEDOWN_ANNOUNCEMENT_V2;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_ACKNOWLEDGE;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_FLOWTOKEN;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_PRODUCER_REQUEST_CREDITS;
Expand Down Expand Up @@ -252,6 +254,10 @@ private Packet slowPathDecode(ActiveMQBuffer in, byte packetType, CoreRemotingCo
packet = new ScaleDownAnnounceMessage();
break;
}
case SCALEDOWN_ANNOUNCEMENT_V2: {
packet = new ScaleDownAnnounceMessageV2();
break;
}
default: {
packet = super.decode(packetType, connection);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,17 @@

public class ScaleDownAnnounceMessage extends PacketImpl {

private SimpleString targetNodeId;
private SimpleString scaledDownNodeId;
protected SimpleString targetNodeId;
protected SimpleString scaledDownNodeId;

public ScaleDownAnnounceMessage() {
super(SCALEDOWN_ANNOUNCEMENT);
}

public ScaleDownAnnounceMessage(byte type) {
super(type);
}

public ScaleDownAnnounceMessage(SimpleString targetNodeId, SimpleString scaledDownNodeId) {
super(SCALEDOWN_ANNOUNCEMENT);
this.targetNodeId = targetNodeId;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;

import org.apache.activemq.artemis.api.core.SimpleString;

public class ScaleDownAnnounceMessageV2 extends ScaleDownAnnounceMessage {

public ScaleDownAnnounceMessageV2() {
super(SCALEDOWN_ANNOUNCEMENT_V2);
}

public ScaleDownAnnounceMessageV2(SimpleString targetNodeId, SimpleString scaledDownNodeId) {
this();
this.targetNodeId = targetNodeId;
this.scaledDownNodeId = scaledDownNodeId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -450,4 +450,5 @@ default void errorProcessing(Consumer consumer, Throwable t, MessageReference me

}

boolean internalDelete();
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.activemq.artemis.core.client.impl.Topology;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.cluster.impl.BridgeMetrics;
import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionMetrics;

Expand Down Expand Up @@ -96,4 +97,11 @@ void nodeAnnounced(long eventUID,
* @return
*/
BridgeMetrics getBridgeMetrics(String nodeId);

/**
* Remove the store-and-forward queue after scale down
*/
void removeSfQueue(SimpleString scaledDownNodeId);

void removeSfQueue(Queue queue);
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.QuorumVoteMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.QuorumVoteReplyMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ScaleDownAnnounceMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ScaleDownAnnounceMessageV2;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
Expand Down Expand Up @@ -195,8 +196,10 @@ public boolean requestSharedStoreBackup(int backupSize,
return requestBackup(backupRequestMessage);
}

public void announceScaleDown(SimpleString targetNodeId, SimpleString scaledDownNodeId) {
ScaleDownAnnounceMessage announceMessage = new ScaleDownAnnounceMessage(targetNodeId, scaledDownNodeId);
public void announceScaleDown(SimpleString targetNodeId, SimpleString scaledDownNodeId, boolean isCleanupSfQueue) {

ScaleDownAnnounceMessage announceMessage = isCleanupSfQueue ? new ScaleDownAnnounceMessageV2(targetNodeId, scaledDownNodeId) : new ScaleDownAnnounceMessage(targetNodeId, scaledDownNodeId);

clusterChannel.send(announceMessage);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -400,12 +401,19 @@ public void handlePacket(Packet packet) {
Vote vote = quorumManager.vote(quorumVoteMessage.getHandler(), quorumVoteMessage.getVote());
ActiveMQServerLogger.LOGGER.sendingQuorumVoteResponse(vote.toString());
clusterChannel.send(new QuorumVoteReplyMessage(quorumVoteMessage.getHandler(), vote));
} else if (packet.getType() == PacketImpl.SCALEDOWN_ANNOUNCEMENT) {
} else if (packet.getType() == PacketImpl.SCALEDOWN_ANNOUNCEMENT || packet.getType() == PacketImpl.SCALEDOWN_ANNOUNCEMENT_V2) {
ScaleDownAnnounceMessage message = (ScaleDownAnnounceMessage) packet;
//we don't really need to check as it should always be true
if (server.getNodeID().equals(message.getTargetNodeId())) {
server.addScaledDownNode(message.getScaledDownNodeId());
}
if (packet.getType() == PacketImpl.SCALEDOWN_ANNOUNCEMENT_V2) {
ClusterManager clusterManager = ClusterController.this.server.getClusterManager();
Set<ClusterConnection> ccs = clusterManager.getClusterConnections();
for (ClusterConnection cc : ccs) {
cc.removeSfQueue(message.getScaledDownNodeId());
}
}
} else if (channelHandler != null) {
channelHandler.handlePacket(packet);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,21 +41,25 @@ public class ScaleDownPolicy {

private boolean enabled;

private boolean isCleanupSfQueue;

public ScaleDownPolicy() {
}

public ScaleDownPolicy(List<String> connectors, String groupName, String clusterName, boolean enabled) {
public ScaleDownPolicy(List<String> connectors, String groupName, String clusterName, boolean enabled, boolean isCleanupSfQueue) {
this.connectors = connectors;
this.groupName = groupName;
this.clusterName = clusterName;
this.enabled = enabled;
this.isCleanupSfQueue = isCleanupSfQueue;
}

public ScaleDownPolicy(String discoveryGroup, String groupName, String clusterName, boolean enabled) {
public ScaleDownPolicy(String discoveryGroup, String groupName, String clusterName, boolean enabled, boolean isCleanupSfQueue) {
this.discoveryGroup = discoveryGroup;
this.groupName = groupName;
this.clusterName = clusterName;
this.enabled = enabled;
this.isCleanupSfQueue = isCleanupSfQueue;
}

public List<String> getConnectors() {
Expand Down Expand Up @@ -124,4 +128,8 @@ private static TransportConfiguration[] connectorNameListToArray(final List<Stri
ActiveMQServer activeMQServer) {
return activeMQServer.getConfiguration().getTransportConfigurations(connectorNames);
}

public boolean isCleanupSfQueue() {
return this.isCleanupSfQueue;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1113,6 +1113,9 @@ protected void nodeUP(TopologyMember member, boolean last) {

}

protected void postStop() {
}


// Inner classes -------------------------------------------------

Expand Down Expand Up @@ -1229,6 +1232,7 @@ public void run() {
logger.trace("Removing consumer on stopRunnable " + this + " from queue " + queue);
}
ActiveMQServerLogger.LOGGER.bridgeStopped(name);
postStop();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,11 @@ protected void nodeUP(TopologyMember member, boolean last) {
super.nodeUP(member, last);
}

@Override
protected void postStop() {
clusterConnection.removeSfQueue(queue);
}


@Override
protected void afterConnect() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -710,7 +710,7 @@ public void nodeUP(final TopologyMember topologyMember, final boolean last) {

// New node - create a new flow record

final SimpleString queueName = new SimpleString(storeAndForwardPrefix + name + "." + nodeID);
final SimpleString queueName = getSfQueueName(nodeID);

Binding queueBinding = postOffice.getBinding(queueName);

Expand Down Expand Up @@ -741,6 +741,10 @@ public void nodeUP(final TopologyMember topologyMember, final boolean last) {
}
}

public SimpleString getSfQueueName(String nodeID) {
return new SimpleString(storeAndForwardPrefix + name + "." + nodeID);
}

@Override
public synchronized void informClusterOfBackup() {
String nodeID = server.getNodeID().toString();
Expand Down Expand Up @@ -770,6 +774,27 @@ public BridgeMetrics getBridgeMetrics(String nodeId) {
return record != null && record.getBridge() != null ? record.getBridge().getMetrics() : null;
}

@Override
public void removeSfQueue(SimpleString scaledDownNodeId) {
SimpleString sfQName = getSfQueueName(scaledDownNodeId.toString());
Binding binding = server.getPostOffice().getBinding(sfQName);

if (binding != null) {
removeSfQueue((Queue) binding.getBindable());
}
}

@Override
public void removeSfQueue(Queue queue) {
if (queue.internalDelete()) {
try {
server.removeAddressInfo(queue.getAddress(), null);
} catch (Exception e) {
logger.debug("Failed to remove sf address: " + queue.getAddress(), e);
}
}
}

private void createNewRecord(final long eventUID,
final String targetNodeID,
final TransportConfiguration connector,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.activemq.artemis.core.server.QueueFactory;
import org.apache.activemq.artemis.core.server.cluster.ActiveMQServerSideProtocolManagerFactory;
import org.apache.activemq.artemis.core.server.cluster.ClusterController;
import org.apache.activemq.artemis.core.server.cluster.ha.ScaleDownPolicy;
import org.apache.activemq.artemis.core.server.group.GroupingHandler;
import org.apache.activemq.artemis.core.server.management.ManagementService;
import org.apache.activemq.artemis.core.transaction.ResourceManager;
Expand All @@ -49,6 +50,7 @@ public class BackupRecoveryJournalLoader extends PostOfficeJournalLoader {
private ActiveMQServer parentServer;
private ServerLocator locator;
private final ClusterController clusterController;
private ScaleDownPolicy scaleDownPolicy;

public BackupRecoveryJournalLoader(PostOffice postOffice,
PagingManager pagingManager,
Expand All @@ -60,12 +62,14 @@ public BackupRecoveryJournalLoader(PostOffice postOffice,
Configuration configuration,
ActiveMQServer parentServer,
ServerLocatorInternal locator,
ClusterController clusterController) {
ClusterController clusterController,
ScaleDownPolicy scaleDownPolicy) {

super(postOffice, pagingManager, storageManager, queueFactory, nodeManager, managementService, groupingHandler, configuration);
this.parentServer = parentServer;
this.locator = locator;
this.clusterController = clusterController;
this.scaleDownPolicy = scaleDownPolicy;
}

@Override
Expand All @@ -87,11 +91,12 @@ public void handleDuplicateIds(Map<SimpleString, List<Pair<byte[], Long>>> dupli
public void postLoad(Journal messageJournal,
ResourceManager resourceManager,
Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap) throws Exception {

ScaleDownHandler scaleDownHandler = new ScaleDownHandler(pagingManager, postOffice, nodeManager, clusterController, parentServer.getStorageManager());
locator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(locator));

try (ClientSessionFactory sessionFactory = locator.createSessionFactory()) {
scaleDownHandler.scaleDown(sessionFactory, resourceManager, duplicateIDMap, parentServer.getConfiguration().getManagementAddress(), parentServer.getNodeID());
scaleDownHandler.scaleDown(sessionFactory, resourceManager, duplicateIDMap, parentServer.getConfiguration().getManagementAddress(), parentServer.getNodeID(), this.scaleDownPolicy);
}
}

Expand Down
Loading

0 comments on commit 397cef6

Please sign in to comment.