Permalink
Browse files

HORNETQ-1323 - Scale-down by transferring messages on clean shutdown

  • Loading branch information...
1 parent dca7e84 commit 0125ced52ae2d4c8c3412b86119af2b334c51cd7 @jbertram jbertram committed Feb 18, 2014
Showing with 2,872 additions and 395 deletions.
  1. +23 −0 docs/user-manual/en/clusters.xml
  2. +11 −0 hornetq-core-client/src/main/java/org/hornetq/api/core/client/TopologyMember.java
  3. +11 −0 hornetq-core-client/src/main/java/org/hornetq/api/core/management/HornetQServerControl.java
  4. +5 −4 hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
  5. +1 −1 hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryInternal.java
  6. +4 −3 hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java
  7. +1 −1 hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ServerLocatorInternal.java
  8. +2 −2 hornetq-core-client/src/main/java/org/hornetq/core/client/impl/Topology.java
  9. +14 −5 hornetq-core-client/src/main/java/org/hornetq/core/client/impl/TopologyMemberImpl.java
  10. +28 −0 hornetq-core-client/src/main/java/org/hornetq/core/message/impl/MessageImpl.java
  11. +6 −0 hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/impl/ChannelImpl.java
  12. +34 −16 ...q-core-client/src/main/java/org/hornetq/core/protocol/core/impl/HornetQClientProtocolManager.java
  13. +22 −1 hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java
  14. +6 −0 hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java
  15. +24 −11 hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java
  16. +18 −35 ...src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ClusterTopologyChangeMessage_V2.java
  17. +119 −0 ...src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ClusterTopologyChangeMessage_V3.java
  18. +6 −1 ...q-core-client/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/DisconnectMessage.java
  19. +107 −0 ...ore-client/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/DisconnectMessage_V2.java
  20. +17 −12 ...core-client/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/NodeAnnounceMessage.java
  21. +119 −0 ...e-client/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/NodeAnnounceMessage_V2.java
  22. +6 −0 hornetq-core-client/src/main/java/org/hornetq/spi/core/protocol/RemotingConnection.java
  23. +1 −1 hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/ClientProtocolManager.java
  24. +6 −5 hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/ProtocolResponseHandler.java
  25. +1 −1 hornetq-core-client/src/main/resources/hornetq-version.properties
  26. +8 −1 ...ornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/ProtonRemotingConnection.java
  27. +7 −0 ...otocols/hornetq-stomp-protocol/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java
  28. +42 −0 hornetq-server/src/main/java/org/hornetq/core/config/Configuration.java
  29. +18 −1 hornetq-server/src/main/java/org/hornetq/core/config/ConfigurationUtils.java
  30. +61 −7 hornetq-server/src/main/java/org/hornetq/core/config/impl/ConfigurationImpl.java
  31. +6 −0 hornetq-server/src/main/java/org/hornetq/core/deployers/impl/FileConfigurationParser.java
  32. +31 −0 hornetq-server/src/main/java/org/hornetq/core/management/impl/HornetQServerControlImpl.java
  33. +2 −0 hornetq-server/src/main/java/org/hornetq/core/paging/cursor/PageSubscription.java
  34. +31 −0 hornetq-server/src/main/java/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
  35. +4 −0 hornetq-server/src/main/java/org/hornetq/core/postoffice/PostOffice.java
  36. +5 −0 hornetq-server/src/main/java/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
  37. +18 −4 hornetq-server/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
  38. +10 −5 hornetq-server/src/main/java/org/hornetq/core/remoting/server/RemotingService.java
  39. +2 −3 hornetq-server/src/main/java/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
  40. +167 −166 hornetq-server/src/main/java/org/hornetq/core/server/HornetQServerLogger.java
  41. +27 −8 hornetq-server/src/main/java/org/hornetq/core/server/LiveNodeLocator.java
  42. +1 −1 hornetq-server/src/main/java/org/hornetq/core/server/cluster/ClusterConnection.java
  43. +6 −1 hornetq-server/src/main/java/org/hornetq/core/server/cluster/ClusterManager.java
  44. +50 −56 hornetq-server/src/main/java/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
  45. +18 −3 ...va/org/hornetq/core/server/impl/{AnyLiveNodeLocator.java → AnyLiveNodeLocatorForReplication.java}
  46. +182 −0 hornetq-server/src/main/java/org/hornetq/core/server/impl/AnyLiveNodeLocatorForScaleDown.java
  47. +102 −8 hornetq-server/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
  48. +29 −4 hornetq-server/src/main/java/org/hornetq/core/server/impl/MessageReferenceImpl.java
  49. +21 −7 ...rg/hornetq/core/server/impl/{NamedLiveNodeLocator.java → NamedLiveNodeLocatorForReplication.java}
  50. +186 −0 hornetq-server/src/main/java/org/hornetq/core/server/impl/NamedLiveNodeLocatorForScaleDown.java
  51. +291 −0 hornetq-server/src/main/java/org/hornetq/core/server/impl/ScaleDownHandler.java
  52. +26 −0 hornetq-server/src/main/resources/schema/hornetq-configuration.xsd
  53. +1 −0 hornetq-server/src/test/java/org/hornetq/core/config/impl/DefaultsFileConfigurationTest.java
  54. +5 −2 hornetq-server/src/test/java/org/hornetq/core/config/impl/FileConfigurationParserTest.java
  55. +3 −0 hornetq-server/src/test/java/org/hornetq/core/config/impl/FileConfigurationTest.java
  56. +0 −1 hornetq-server/src/test/java/org/hornetq/core/server/impl/ScheduledDeliveryHandlerTest.java
  57. +3 −0 hornetq-server/src/test/resources/ConfigurationTest-full-config.xml
  58. +1 −1 pom.xml
  59. +17 −12 tests/byteman-tests/pom.xml
  60. +183 −0 tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/ScaleDownFailoverTest.java
  61. +110 −0 tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/ScaleDownFailureTest.java
  62. +21 −0 tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/ScaleDownGroupedFailoverTest.java
  63. +21 −0 tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/ScaleDownGroupedFailureTest.java
  64. +6 −1 ...ation-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
  65. +1 −0 ...ration-tests/src/test/java/org/hornetq/tests/integration/management/HornetQServerControlTest.java
  66. +10 −0 ...sts/src/test/java/org/hornetq/tests/integration/management/HornetQServerControlUsingCoreTest.java
  67. +479 −0 tests/integration-tests/src/test/java/org/hornetq/tests/integration/server/ScaleDownTest.java
  68. +8 −4 tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
  69. +52 −0 tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/postoffice/impl/QueueComparatorTest.java
  70. +8 −0 tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java
View
23 docs/user-manual/en/clusters.xml
@@ -972,4 +972,27 @@ locator.setLoadBalancingPolicyClassName("com.acme.MyLoadBalancingPolicy");</prog
C which does have consumers.</para>
</section>
</section>
+ <section>
+ <title>Scaling Down</title>
+ <para>HornetQ supports scaling down a cluster with no message loss (even for non-durable messages). This is especially
+ useful in certain environments (e.g. the cloud) where the size of a cluster may change relatively frequently.
+ When scaling up a cluster (i.e. adding nodes) there is no risk of message loss, but when scaling down a cluster
+ (i.e. removing nodes) the messages on those nodes would be lost unless the broker sent them to another node in
+ the cluster. HornetQ can be configured to do just that.</para>
+ <para>The simplest way to enable this behavior is to set <literal>scale-down</literal> to
+ <literal>true</literal>. If the server is clustered and <literal>scale-down</literal> is
+ <literal>true</literal> then when the server is shutdown gracefully (i.e. stopped without crashing) it will find
+ another node in the cluster and send <emphasis>all</emphasis> of its messages (both durable and non-durable)
+ to that node. The messages are processed in order and go to the <emphasis>back</emphasis> of the respective
+ queues on the other node (just as if the messages were sent from an external client for the first time).</para>
+ <para>If more control over where the messages go is required then specify <literal>scale-down-group-name</literal>.
+ Messages will only be sent to another node in the cluster that uses the same <literal>scale-down-group-name</literal>
+ as the server being shutdown.</para>
+ <warning>
+ <para>If cluster nodes are grouped together with different <literal>scale-down-group-name</literal> values beware.
+ If all the nodes in a single group are shut down then the messages from that node/group will be lost.</para>
+ </warning>
+ <para>If the server is using multiple <literal>cluster-connection</literal> then use <literal>scale-down-clustername</literal>
+ to identify the name of the <literal>cluster-connection</literal> which should be used for scaling down.</para>
+ </section>
</chapter>
View
11 hornetq-core-client/src/main/java/org/hornetq/api/core/client/TopologyMember.java
@@ -42,6 +42,17 @@
String getBackupGroupName();
/**
+ * Returns the {@code scale-down-group-name} of the live server with this Topology entry.
+ * <p/>
+ * This is a server configuration value. a live server will only send its messages to another live server
+ * with matching {@code scale-down-group-name}.
+ * <p/>
+ *
+ * @return the {@code scale-down-group-name}
+ */
+ String getScaleDownGroupName();
+
+ /**
* @return configuration relative to the live server
*/
TransportConfiguration getLive();
View
11 hornetq-core-client/src/main/java/org/hornetq/api/core/management/HornetQServerControl.java
@@ -147,6 +147,17 @@
boolean isFailoverOnServerShutdown();
/**
+ * does the server scale down on clean shutdown
+ */
+ void setScaleDown(boolean scaleDown) throws Exception;
+
+
+ /**
+ * returns if server scales down on clean shutdown
+ */
+ boolean isScaleDown();
+
+ /**
* Returns the minimal number of journal files before compacting.
*/
int getJournalCompactMinFiles();
View
9 hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
@@ -1183,12 +1183,13 @@ public ConfirmationWindowWarning getConfirmationWindowWarning()
public void sendNodeAnnounce(final long currentEventID,
String nodeID,
- String nodeName,
+ String backupGroupName,
+ String scaleDownGroupName,
boolean isBackup,
TransportConfiguration config,
TransportConfiguration backupConfig)
{
- clientProtocolManager.sendNodeAnnounce(currentEventID, nodeID, nodeName, isBackup, config, backupConfig);
+ clientProtocolManager.sendNodeAnnounce(currentEventID, nodeID, backupGroupName, scaleDownGroupName, isBackup, config, backupConfig);
}
protected Connection openTransportConnection(final Connector connector)
@@ -1550,9 +1551,9 @@ public void nodeDisconnected(RemotingConnection conn, String nodeID)
}
@Override
- public void notifyNodeUp(long uniqueEventID, String nodeID, String nodeName, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean isLast)
+ public void notifyNodeUp(long uniqueEventID, String nodeID, String backupGroupName, String scaleDownGroupName, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean isLast)
{
- serverLocator.notifyNodeUp(uniqueEventID, nodeID, nodeName, connectorPair, isLast);
+ serverLocator.notifyNodeUp(uniqueEventID, nodeID, backupGroupName, scaleDownGroupName, connectorPair, isLast);
}
@Override
View
2 ...-core-client/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryInternal.java
@@ -46,7 +46,7 @@
void connect(int reconnectAttempts, boolean failoverOnInitialConnection) throws HornetQException;
- void sendNodeAnnounce(final long currentEventID, String nodeID, String nodeName, boolean isBackup, TransportConfiguration config, TransportConfiguration backupConfig);
+ void sendNodeAnnounce(final long currentEventID, String nodeID, String backupGroupName, String scaleDownGroupName, boolean isBackup, TransportConfiguration config, TransportConfiguration backupConfig);
TransportConfiguration getConnectorConfiguration();
View
7 hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java
@@ -1580,7 +1580,8 @@ public void notifyNodeDown(final long eventTime, final String nodeID)
public void notifyNodeUp(long uniqueEventID,
final String nodeID,
- final String nodeName,
+ final String backupGroupName,
+ final String scaleDownGroupName,
final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
final boolean last)
{
@@ -1589,7 +1590,7 @@ public void notifyNodeUp(long uniqueEventID,
HornetQClientLogger.LOGGER.trace("NodeUp " + this + "::nodeID=" + nodeID + ", connectorPair=" + connectorPair, new Exception("trace"));
}
- TopologyMemberImpl member = new TopologyMemberImpl(nodeID, nodeName, connectorPair.getA(), connectorPair.getB());
+ TopologyMemberImpl member = new TopologyMemberImpl(nodeID, backupGroupName, scaleDownGroupName, connectorPair.getA(), connectorPair.getB());
topology.updateMember(uniqueEventID, nodeID, member);
@@ -1677,7 +1678,7 @@ public synchronized void connectorsChanged(List<DiscoveryEntry> newConnectors)
if (ha && topology.getMember(entry.getNodeID()) == null)
{
- TopologyMemberImpl member = new TopologyMemberImpl(entry.getNodeID(), null, entry.getConnector(), null);
+ TopologyMemberImpl member = new TopologyMemberImpl(entry.getNodeID(), null, null, entry.getConnector(), null);
// on this case we set it as zero as any update coming from server should be accepted
topology.updateMember(0, entry.getNodeID(), member);
}
View
2 hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ServerLocatorInternal.java
@@ -60,7 +60,7 @@
*/
ClientSessionFactoryInternal connectNoWarnings() throws HornetQException;
- void notifyNodeUp(long uniqueEventID, String nodeID, String nodeName,
+ void notifyNodeUp(long uniqueEventID, String nodeID, String backupGroupName, String scaleDownGroupName,
Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last);
/**
View
4 hornetq-core-client/src/main/java/org/hornetq/core/client/impl/Topology.java
@@ -164,7 +164,7 @@ public TopologyMemberImpl updateBackup(final TopologyMemberImpl memberInput)
}
TopologyMemberImpl newMember =
- new TopologyMemberImpl(nodeId, currentMember.getBackupGroupName(), currentMember.getLive(),
+ new TopologyMemberImpl(nodeId, currentMember.getBackupGroupName(), currentMember.getScaleDownGroupName(), currentMember.getLive(),
memberInput.getBackup());
newMember.setUniqueEventID(System.currentTimeMillis());
topology.remove(nodeId);
@@ -216,7 +216,7 @@ public boolean updateMember(final long uniqueEventID, final String nodeId, final
if (uniqueEventID > currentMember.getUniqueEventID())
{
TopologyMemberImpl newMember =
- new TopologyMemberImpl(nodeId, memberInput.getBackupGroupName(), memberInput.getLive(),
+ new TopologyMemberImpl(nodeId, memberInput.getBackupGroupName(), memberInput.getScaleDownGroupName(), memberInput.getLive(),
memberInput.getBackup());
if (newMember.getLive() == null && currentMember.getLive() != null)
View
19 hornetq-core-client/src/main/java/org/hornetq/core/client/impl/TopologyMemberImpl.java
@@ -27,7 +27,9 @@
private final Pair<TransportConfiguration, TransportConfiguration> connector;
- private final String nodeName;
+ private final String backupGroupName;
+
+ private final String scaleDownGroupName;
/**
* transient to avoid serialization changes
@@ -36,11 +38,12 @@
private final String nodeId;
- public TopologyMemberImpl(String nodeId, final String nodeName, final TransportConfiguration a,
+ public TopologyMemberImpl(String nodeId, final String backupGroupName, final String scaleDownGroupName, final TransportConfiguration a,
final TransportConfiguration b)
{
this.nodeId = nodeId;
- this.nodeName = nodeName;
+ this.backupGroupName = backupGroupName;
+ this.scaleDownGroupName = scaleDownGroupName;
this.connector = new Pair<TransportConfiguration, TransportConfiguration>(a, b);
uniqueEventID = System.currentTimeMillis();
}
@@ -82,7 +85,13 @@ public long getUniqueEventID()
@Override
public String getBackupGroupName()
{
- return nodeName;
+ return backupGroupName;
+ }
+
+ @Override
+ public String getScaleDownGroupName()
+ {
+ return scaleDownGroupName;
}
/**
@@ -124,6 +133,6 @@ public boolean isMember(TransportConfiguration configuration)
@Override
public String toString()
{
- return "TopologyMember[name = " + nodeName + ", connector=" + connector + "]";
+ return "TopologyMember[id = " + nodeId + ", connector=" + connector + ", backupGroupName=" + backupGroupName + ", scaleDownGroupName=" + scaleDownGroupName + "]";
}
}
View
28 hornetq-core-client/src/main/java/org/hornetq/core/message/impl/MessageImpl.java
@@ -902,6 +902,34 @@ public TypedProperties getTypedProperties()
return this.properties;
}
+ @Override
+ public boolean equals(Object other)
+ {
+
+ if (this == other)
+ {
+ return true;
+ }
+
+ if (other instanceof MessageImpl)
+ {
+ MessageImpl message = (MessageImpl) other;
+
+ if (this.getMessageID() == message.getMessageID())
+ return true;
+ }
+
+ return false;
+ }
+
+
+
+ @Override
+ public int hashCode()
+ {
+ return 31 + (int)(messageID ^ (messageID >>> 32));
+ }
+
// Public --------------------------------------------------------
// Package protected ---------------------------------------------
View
6 hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/impl/ChannelImpl.java
@@ -149,6 +149,12 @@ public boolean supports(final byte packetType)
return version >= 122;
case PacketImpl.DISCONNECT_CONSUMER:
return version >= 124;
+ case PacketImpl.CLUSTER_TOPOLOGY_V3:
+ return version >= 125;
+ case PacketImpl.NODE_ANNOUNCE_V2:
+ return version >= 125;
+ case PacketImpl.DISCONNECT_V2:
+ return version >= 125;
default:
return true;
}
View
50 ...lient/src/main/java/org/hornetq/core/protocol/core/impl/HornetQClientProtocolManager.java
@@ -35,10 +35,11 @@
import org.hornetq.core.protocol.core.Packet;
import org.hornetq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage_V2;
+import org.hornetq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage_V3;
import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionMessage;
import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionResponseMessage;
import org.hornetq.core.protocol.core.impl.wireformat.DisconnectMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.NodeAnnounceMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.NodeAnnounceMessage_V2;
import org.hornetq.core.protocol.core.impl.wireformat.Ping;
import org.hornetq.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessageV2;
import org.hornetq.core.version.Version;
@@ -99,7 +100,6 @@
private final CountDownLatch waitLatch = new CountDownLatch(1);
-
public HornetQClientProtocolManager(ClientSessionFactoryInternal factory)
{
this.factoryInternal = factory;
@@ -171,7 +171,7 @@ public Lock lockSessionCreation()
{
localFailoverLock.unlock();
}
- // We can now release the failoverLock
+ // We can now release the failoverLock
}
catch (InterruptedException e)
{
@@ -210,7 +210,6 @@ public boolean isAlive()
}
-
public void setConnection(RemotingConnection connection)
{
this.connection = (RemotingConnectionImpl) connection;
@@ -245,14 +244,20 @@ public void setResponseHandler(ProtocolResponseHandler handler)
public void sendSubscribeTopology(final boolean isServer)
{
getChannel0().send(new SubscribeClusterTopologyUpdatesMessageV2(isServer,
- VersionLoader.getVersion()
- .getIncrementingVersion()));
+ VersionLoader.getVersion()
+ .getIncrementingVersion()));
}
- public void sendNodeAnnounce(final long currentEventID, final String nodeID, final String nodeName,
- final boolean isBackup, final TransportConfiguration config, final TransportConfiguration backupConfig)
+ public void sendNodeAnnounce(final long currentEventID,
+ String nodeID,
+ String backupGroupName,
+ String scaleDownGroupName,
+ boolean isBackup,
+ TransportConfiguration config,
+ TransportConfiguration backupConfig)
{
- getChannel0().send(new NodeAnnounceMessage(currentEventID, nodeID, nodeName, isBackup, config, backupConfig));
+ Channel channel0 = connection.getChannel(0, -1);
+ channel0.send(new NodeAnnounceMessage_V2(currentEventID, nodeID, backupGroupName, scaleDownGroupName, isBackup, config, backupConfig));
}
@Override
@@ -456,7 +461,7 @@ public void handlePacket(final Packet packet)
{
final byte type = packet.getType();
- if (type == PacketImpl.DISCONNECT)
+ if (type == PacketImpl.DISCONNECT || type == PacketImpl.DISCONNECT_V2)
{
final DisconnectMessage msg = (DisconnectMessage) packet;
@@ -475,6 +480,11 @@ else if (type == PacketImpl.CLUSTER_TOPOLOGY_V2)
ClusterTopologyChangeMessage_V2 topMessage = (ClusterTopologyChangeMessage_V2) packet;
notifyTopologyChange(topMessage);
}
+ else if (type == PacketImpl.CLUSTER_TOPOLOGY_V3)
+ {
+ ClusterTopologyChangeMessage_V3 topMessage = (ClusterTopologyChangeMessage_V3) packet;
+ notifyTopologyChange(topMessage);
+ }
}
/**
@@ -483,17 +493,25 @@ else if (type == PacketImpl.CLUSTER_TOPOLOGY_V2)
private void notifyTopologyChange(final ClusterTopologyChangeMessage topMessage)
{
final long eventUID;
- final String nodeName;
-
- if (topMessage instanceof ClusterTopologyChangeMessage_V2)
+ final String backupGroupName;
+ final String scaleDownGroupName;
+ if (topMessage instanceof ClusterTopologyChangeMessage_V3)
+ {
+ eventUID = ((ClusterTopologyChangeMessage_V3) topMessage).getUniqueEventID();
+ backupGroupName = ((ClusterTopologyChangeMessage_V3) topMessage).getBackupGroupName();
+ scaleDownGroupName = ((ClusterTopologyChangeMessage_V3) topMessage).getScaleDownGroupName();
+ }
+ else if (topMessage instanceof ClusterTopologyChangeMessage_V2)
{
eventUID = ((ClusterTopologyChangeMessage_V2) topMessage).getUniqueEventID();
- nodeName = ((ClusterTopologyChangeMessage_V2) topMessage).getNodeName();
+ backupGroupName = ((ClusterTopologyChangeMessage_V2) topMessage).getBackupGroupName();
+ scaleDownGroupName = null;
}
else
{
eventUID = System.currentTimeMillis();
- nodeName = null;
+ backupGroupName = null;
+ scaleDownGroupName = null;
}
if (topMessage.isExit())
@@ -517,7 +535,7 @@ private void notifyTopologyChange(final ClusterTopologyChangeMessage topMessage)
}
if (callbackHandler != null)
- callbackHandler.notifyNodeUp(eventUID, topMessage.getNodeID(), nodeName, transportConfig, topMessage.isLast());
+ callbackHandler.notifyNodeUp(eventUID, topMessage.getNodeID(), backupGroupName, scaleDownGroupName, transportConfig, topMessage.isLast());
}
}
}
View
23 hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java
@@ -14,15 +14,18 @@
import static org.hornetq.core.protocol.core.impl.PacketImpl.CLUSTER_TOPOLOGY;
import static org.hornetq.core.protocol.core.impl.PacketImpl.CLUSTER_TOPOLOGY_V2;
+import static org.hornetq.core.protocol.core.impl.PacketImpl.CLUSTER_TOPOLOGY_V3;
import static org.hornetq.core.protocol.core.impl.PacketImpl.CREATESESSION;
import static org.hornetq.core.protocol.core.impl.PacketImpl.CREATESESSION_RESP;
import static org.hornetq.core.protocol.core.impl.PacketImpl.CREATE_QUEUE;
import static org.hornetq.core.protocol.core.impl.PacketImpl.CREATE_SHARED_QUEUE;
import static org.hornetq.core.protocol.core.impl.PacketImpl.DELETE_QUEUE;
import static org.hornetq.core.protocol.core.impl.PacketImpl.DISCONNECT;
+import static org.hornetq.core.protocol.core.impl.PacketImpl.DISCONNECT_V2;
import static org.hornetq.core.protocol.core.impl.PacketImpl.DISCONNECT_CONSUMER;
import static org.hornetq.core.protocol.core.impl.PacketImpl.EXCEPTION;
import static org.hornetq.core.protocol.core.impl.PacketImpl.NODE_ANNOUNCE;
+import static org.hornetq.core.protocol.core.impl.PacketImpl.NODE_ANNOUNCE_V2;
import static org.hornetq.core.protocol.core.impl.PacketImpl.NULL_RESPONSE;
import static org.hornetq.core.protocol.core.impl.PacketImpl.PACKETS_CONFIRMED;
import static org.hornetq.core.protocol.core.impl.PacketImpl.PING;
@@ -54,6 +57,7 @@
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_UNIQUE_ADD_METADATA;
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_XA_COMMIT;
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_XA_END;
+import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_XA_FAILED;
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_XA_FORGET;
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_XA_GET_TIMEOUT;
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_XA_GET_TIMEOUT_RESP;
@@ -67,7 +71,6 @@
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_XA_SET_TIMEOUT;
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_XA_SET_TIMEOUT_RESP;
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_XA_START;
-import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_XA_FAILED;
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_XA_SUSPEND;
import static org.hornetq.core.protocol.core.impl.PacketImpl.SUBSCRIBE_TOPOLOGY;
import static org.hornetq.core.protocol.core.impl.PacketImpl.SUBSCRIBE_TOPOLOGY_V2;
@@ -79,14 +82,17 @@
import org.hornetq.core.protocol.core.Packet;
import org.hornetq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage_V2;
+import org.hornetq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage_V3;
import org.hornetq.core.protocol.core.impl.wireformat.CreateQueueMessage;
import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionMessage;
import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionResponseMessage;
import org.hornetq.core.protocol.core.impl.wireformat.CreateSharedQueueMessage;
import org.hornetq.core.protocol.core.impl.wireformat.DisconnectConsumerMessage;
import org.hornetq.core.protocol.core.impl.wireformat.DisconnectMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.DisconnectMessage_V2;
import org.hornetq.core.protocol.core.impl.wireformat.HornetQExceptionMessage;
import org.hornetq.core.protocol.core.impl.wireformat.NodeAnnounceMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.NodeAnnounceMessage_V2;
import org.hornetq.core.protocol.core.impl.wireformat.NullResponseMessage;
import org.hornetq.core.protocol.core.impl.wireformat.PacketsConfirmedMessage;
import org.hornetq.core.protocol.core.impl.wireformat.Ping;
@@ -157,6 +163,11 @@ public Packet decode(byte packetType)
packet = new DisconnectMessage();
break;
}
+ case DISCONNECT_V2:
+ {
+ packet = new DisconnectMessage_V2();
+ break;
+ }
case DISCONNECT_CONSUMER:
{
packet = new DisconnectConsumerMessage();
@@ -412,11 +423,21 @@ public Packet decode(byte packetType)
packet = new ClusterTopologyChangeMessage_V2();
break;
}
+ case CLUSTER_TOPOLOGY_V3:
+ {
+ packet = new ClusterTopologyChangeMessage_V3();
+ break;
+ }
case NODE_ANNOUNCE:
{
packet = new NodeAnnounceMessage();
break;
}
+ case NODE_ANNOUNCE_V2:
+ {
+ packet = new NodeAnnounceMessage_V2();
+ break;
+ }
case SUBSCRIBE_TOPOLOGY:
{
packet = new SubscribeClusterTopologyUpdatesMessage();
View
6 hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java
@@ -216,6 +216,12 @@
public static final byte REPLICATION_START_FINISH_SYNC = 120;
public static final byte REPLICATION_SCHEDULED_FAILOVER = 121;
+ public static final byte CLUSTER_TOPOLOGY_V3 = 122;
+
+ public static final byte NODE_ANNOUNCE_V2 = 123;
+
+ public static final byte DISCONNECT_V2 = 124;
+
// Static --------------------------------------------------------
public PacketImpl(final byte type)
View
35 ...core-client/src/main/java/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java
@@ -26,16 +26,18 @@
import org.hornetq.api.core.HornetQInterruptedException;
import org.hornetq.api.core.Interceptor;
import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.core.client.HornetQClientLogger;
+import org.hornetq.core.client.HornetQClientMessageBundle;
import org.hornetq.core.protocol.core.Channel;
import org.hornetq.core.protocol.core.CoreRemotingConnection;
import org.hornetq.core.protocol.core.Packet;
import org.hornetq.core.protocol.core.impl.ChannelImpl.CHANNEL_ID;
import org.hornetq.core.protocol.core.impl.wireformat.DisconnectMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.DisconnectMessage_V2;
import org.hornetq.core.remoting.CloseListener;
import org.hornetq.core.remoting.FailureListener;
import org.hornetq.core.security.HornetQPrincipal;
-import org.hornetq.core.client.HornetQClientLogger;
-import org.hornetq.core.client.HornetQClientMessageBundle;
import org.hornetq.spi.core.remoting.Connection;
import org.hornetq.utils.SimpleIDGenerator;
@@ -162,20 +164,18 @@ private RemotingConnectionImpl(final PacketDecoder packetDecoder,
}
-
-
// RemotingConnection implementation
// ------------------------------------------------------------
@Override
public String toString()
{
return "RemotingConnectionImpl [clientID=" + clientID +
- ", nodeID=" +
- nodeID +
- ", transportConnection=" +
- transportConnection +
- "]";
+ ", nodeID=" +
+ nodeID +
+ ", transportConnection=" +
+ transportConnection +
+ "]";
}
public Connection getTransportConnection()
@@ -368,6 +368,11 @@ public void destroy()
public void disconnect(final boolean criticalError)
{
+ disconnect(null, criticalError);
+ }
+
+ public void disconnect(TransportConfiguration transportConfiguration, final boolean criticalError)
+ {
Channel channel0 = getChannel(0, -1);
// And we remove all channels from the connection, this ensures no more packets will be processed after this
@@ -391,13 +396,21 @@ public void disconnect(final boolean criticalError)
if (!criticalError)
{
- for (Channel channel: allChannels)
+ for (Channel channel : allChannels)
{
channel.flushConfirmations();
}
}
+ Packet disconnect;
- Packet disconnect = new DisconnectMessage(nodeID);
+ if (transportConfiguration != null && channel0.supports(PacketImpl.DISCONNECT_V2))
+ {
+ disconnect = new DisconnectMessage_V2(nodeID, transportConfiguration);
+ }
+ else
+ {
+ disconnect = new DisconnectMessage(nodeID);
+ }
channel0.sendAndFlush(disconnect);
}
View
53 .../java/org/hornetq/core/protocol/core/impl/wireformat/ClusterTopologyChangeMessage_V2.java
@@ -21,10 +21,10 @@
*/
public class ClusterTopologyChangeMessage_V2 extends ClusterTopologyChangeMessage
{
- private long uniqueEventID;
- private String nodeName;
+ protected long uniqueEventID;
+ protected String backupGroupName;
- public ClusterTopologyChangeMessage_V2(final long uniqueEventID, final String nodeID, final String nodeName,
+ public ClusterTopologyChangeMessage_V2(final long uniqueEventID, final String nodeID, final String backupGroupName,
final Pair<TransportConfiguration, TransportConfiguration> pair, final boolean last)
{
super(CLUSTER_TOPOLOGY_V2);
@@ -39,7 +39,7 @@ public ClusterTopologyChangeMessage_V2(final long uniqueEventID, final String no
this.uniqueEventID = uniqueEventID;
- this.nodeName = nodeName;
+ this.backupGroupName = backupGroupName;
}
public ClusterTopologyChangeMessage_V2(final long uniqueEventID, final String nodeID)
@@ -58,6 +58,11 @@ public ClusterTopologyChangeMessage_V2()
super(CLUSTER_TOPOLOGY_V2);
}
+ public ClusterTopologyChangeMessage_V2(byte clusterTopologyV3)
+ {
+ super(clusterTopologyV3);
+ }
+
/**
* @return the uniqueEventID
*/
@@ -66,9 +71,9 @@ public long getUniqueEventID()
return uniqueEventID;
}
- public String getNodeName()
+ public String getBackupGroupName()
{
- return nodeName;
+ return backupGroupName;
}
@Override
@@ -99,7 +104,7 @@ public void encodeRest(final HornetQBuffer buffer)
}
buffer.writeBoolean(last);
}
- buffer.writeNullableString(nodeName);
+ buffer.writeNullableString(backupGroupName);
}
@Override
@@ -137,7 +142,7 @@ public void decodeRest(final HornetQBuffer buffer)
}
if (buffer.readableBytes() > 0)
{
- nodeName = buffer.readNullableString();
+ backupGroupName = buffer.readNullableString();
}
}
@@ -146,10 +151,7 @@ public int hashCode()
{
final int prime = 31;
int result = super.hashCode();
- result = prime * result + (exit ? 1231 : 1237);
- result = prime * result + (last ? 1231 : 1237);
- result = prime * result + ((nodeID == null) ? 0 : nodeID.hashCode());
- result = prime * result + ((pair == null) ? 0 : pair.hashCode());
+ result = prime * result + ((backupGroupName == null) ? 0 : backupGroupName.hashCode());
result = prime * result + (int) (uniqueEventID ^ (uniqueEventID >>> 32));
return result;
}
@@ -170,37 +172,18 @@ public boolean equals(Object obj)
return false;
}
ClusterTopologyChangeMessage_V2 other = (ClusterTopologyChangeMessage_V2) obj;
- if (exit != other.exit)
- {
- return false;
- }
- if (last != other.last)
- {
- return false;
- }
- if (nodeID == null)
- {
- if (other.nodeID != null)
- {
- return false;
- }
- }
- else if (!nodeID.equals(other.nodeID))
+ if (uniqueEventID != other.uniqueEventID)
{
return false;
}
- if (pair == null)
+ if (backupGroupName == null)
{
- if (other.pair != null)
+ if (other.backupGroupName != null)
{
return false;
}
}
- else if (!pair.equals(other.pair))
- {
- return false;
- }
- if (uniqueEventID != other.uniqueEventID)
+ else if (!backupGroupName.equals(other.backupGroupName))
{
return false;
}
View
119 .../java/org/hornetq/core/protocol/core/impl/wireformat/ClusterTopologyChangeMessage_V3.java
@@ -0,0 +1,119 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.protocol.core.impl.wireformat;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.Pair;
+import org.hornetq.api.core.TransportConfiguration;
+
+/**
+ * @author Justin Bertram
+ */
+public class ClusterTopologyChangeMessage_V3 extends ClusterTopologyChangeMessage_V2
+{
+ private String scaleDownGroupName;
+
+ public ClusterTopologyChangeMessage_V3(final long uniqueEventID, final String nodeID, final String backupGroupName, final String scaleDownGroupName,
+ final Pair<TransportConfiguration, TransportConfiguration> pair, final boolean last)
+ {
+ super(CLUSTER_TOPOLOGY_V3);
+
+ this.nodeID = nodeID;
+
+ this.pair = pair;
+
+ this.last = last;
+
+ this.exit = false;
+
+ this.uniqueEventID = uniqueEventID;
+
+ this.backupGroupName = backupGroupName;
+
+ this.scaleDownGroupName = scaleDownGroupName;
+ }
+
+ public ClusterTopologyChangeMessage_V3(final long uniqueEventID, final String nodeID)
+ {
+ super(CLUSTER_TOPOLOGY_V3);
+
+ this.exit = true;
+
+ this.nodeID = nodeID;
+
+ this.uniqueEventID = uniqueEventID;
+ }
+
+ public ClusterTopologyChangeMessage_V3()
+ {
+ super(CLUSTER_TOPOLOGY_V3);
+ }
+
+ public String getScaleDownGroupName()
+ {
+ return scaleDownGroupName;
+ }
+
+ @Override
+ public void encodeRest(final HornetQBuffer buffer)
+ {
+ super.encodeRest(buffer);
+ buffer.writeNullableString(scaleDownGroupName);
+ }
+
+ @Override
+ public void decodeRest(final HornetQBuffer buffer)
+ {
+ super.decodeRest(buffer);
+ scaleDownGroupName = buffer.readNullableString();
+ }
+
+ @Override
+ public int hashCode()
+ {
+ final int prime = 31;
+ int result = super.hashCode();
+ result = prime * result + ((scaleDownGroupName == null) ? 0 : scaleDownGroupName.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (this == obj)
+ {
+ return true;
+ }
+ if (!super.equals(obj))
+ {
+ return false;
+ }
+ if (!(obj instanceof ClusterTopologyChangeMessage_V3))
+ {
+ return false;
+ }
+ ClusterTopologyChangeMessage_V3 other = (ClusterTopologyChangeMessage_V3) obj;
+ if (scaleDownGroupName == null)
+ {
+ if (other.scaleDownGroupName != null)
+ {
+ return false;
+ }
+ }
+ else if (!scaleDownGroupName.equals(other.scaleDownGroupName))
+ {
+ return false;
+ }
+ return true;
+ }
+}
View
7 ...lient/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/DisconnectMessage.java
@@ -22,7 +22,7 @@
// Attributes ----------------------------------------------------
- private SimpleString nodeID;
+ protected SimpleString nodeID;
// Static --------------------------------------------------------
@@ -40,6 +40,11 @@ public DisconnectMessage()
super(DISCONNECT);
}
+ public DisconnectMessage(byte disconnectV2)
+ {
+ super(disconnectV2);
+ }
+
// Public --------------------------------------------------------
public SimpleString getNodeID()
View
107 ...nt/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/DisconnectMessage_V2.java
@@ -0,0 +1,107 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.protocol.core.impl.wireformat;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.TransportConfiguration;
+
+public class DisconnectMessage_V2 extends DisconnectMessage
+{
+ private TransportConfiguration scaleDownDestination;
+
+ public DisconnectMessage_V2(final SimpleString nodeID, final TransportConfiguration scaleDownDestination)
+ {
+ super(DISCONNECT_V2);
+
+ this.nodeID = nodeID;
+
+ this.scaleDownDestination = scaleDownDestination;
+ }
+
+ public DisconnectMessage_V2()
+ {
+ super(DISCONNECT_V2);
+ }
+
+ // Public --------------------------------------------------------
+
+ public TransportConfiguration getScaleDownDestination()
+ {
+ return scaleDownDestination;
+ }
+
+ @Override
+ public void encodeRest(final HornetQBuffer buffer)
+ {
+ super.encodeRest(buffer);
+ scaleDownDestination.encode(buffer);
+ }
+
+ @Override
+ public void decodeRest(final HornetQBuffer buffer)
+ {
+ super.decodeRest(buffer);
+ scaleDownDestination = new TransportConfiguration();
+ scaleDownDestination.decode(buffer);
+ }
+
+ @Override
+ public String toString()
+ {
+ StringBuffer buf = new StringBuffer(getParentString());
+ buf.append(", nodeID=" + nodeID);
+ buf.append(", scaleDownDestination=" + scaleDownDestination);
+ buf.append("]");
+ return buf.toString();
+ }
+
+ @Override
+ public int hashCode()
+ {
+ final int prime = 31;
+ int result = super.hashCode();
+ result = prime * result + ((scaleDownDestination == null) ? 0 : scaleDownDestination.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (this == obj)
+ {
+ return true;
+ }
+ if (!super.equals(obj))
+ {
+ return false;
+ }
+ if (!(obj instanceof DisconnectMessage_V2))
+ {
+ return false;
+ }
+ DisconnectMessage_V2 other = (DisconnectMessage_V2) obj;
+ if (scaleDownDestination == null)
+ {
+ if (other.scaleDownDestination != null)
+ {
+ return false;
+ }
+ }
+ else if (!scaleDownDestination.equals(other.scaleDownDestination))
+ {
+ return false;
+ }
+ return true;
+ }
+}
View
29 ...ent/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/NodeAnnounceMessage.java
@@ -22,31 +22,31 @@
*/
public class NodeAnnounceMessage extends PacketImpl
{
- private String nodeID;
+ protected String nodeID;
- private String nodeName;
+ protected String backupGroupName;
- private boolean backup;
+ protected boolean backup;
- private long currentEventID;
+ protected long currentEventID;
- private TransportConfiguration connector;
+ protected TransportConfiguration connector;
- private TransportConfiguration backupConnector;
+ protected TransportConfiguration backupConnector;
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public NodeAnnounceMessage(final long currentEventID, final String nodeID, final String nodeName, final boolean backup, final TransportConfiguration tc, final TransportConfiguration backupConnector)
+ public NodeAnnounceMessage(final long currentEventID, final String nodeID, final String backupGroupName, final boolean backup, final TransportConfiguration tc, final TransportConfiguration backupConnector)
{
super(NODE_ANNOUNCE);
this.currentEventID = currentEventID;
this.nodeID = nodeID;
- this.nodeName = nodeName;
+ this.backupGroupName = backupGroupName;
this.backup = backup;
@@ -60,6 +60,11 @@ public NodeAnnounceMessage()
super(NODE_ANNOUNCE);
}
+ public NodeAnnounceMessage(byte nodeAnnounceMessage_V2)
+ {
+ super(nodeAnnounceMessage_V2);
+ }
+
// Public --------------------------------------------------------
@@ -68,9 +73,9 @@ public String getNodeID()
return nodeID;
}
- public String getNodeName()
+ public String getBackupGroupName()
{
- return nodeName;
+ return backupGroupName;
}
public boolean isBackup()
@@ -100,7 +105,7 @@ public long getCurrentEventID()
public void encodeRest(final HornetQBuffer buffer)
{
buffer.writeString(nodeID);
- buffer.writeNullableString(nodeName);
+ buffer.writeNullableString(backupGroupName);
buffer.writeBoolean(backup);
buffer.writeLong(currentEventID);
if (connector != null)
@@ -127,7 +132,7 @@ public void encodeRest(final HornetQBuffer buffer)
public void decodeRest(final HornetQBuffer buffer)
{
this.nodeID = buffer.readString();
- this.nodeName = buffer.readNullableString();
+ this.backupGroupName = buffer.readNullableString();
this.backup = buffer.readBoolean();
this.currentEventID = buffer.readLong();
if (buffer.readBoolean())
View
119 .../src/main/java/org/hornetq/core/protocol/core/impl/wireformat/NodeAnnounceMessage_V2.java
@@ -0,0 +1,119 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.protocol.core.impl.wireformat;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.TransportConfiguration;
+
+/**
+ * @author Justin Bertram
+ */
+public class NodeAnnounceMessage_V2 extends NodeAnnounceMessage
+{
+ private String scaleDownGroupName;
+
+ // Constructors --------------------------------------------------
+
+ public NodeAnnounceMessage_V2(final long currentEventID, final String nodeID, final String backupGroupName, final String scaleDownGroupName, final boolean backup, final TransportConfiguration tc, final TransportConfiguration backupConnector)
+ {
+ super(NODE_ANNOUNCE_V2);
+
+ this.currentEventID = currentEventID;
+
+ this.nodeID = nodeID;
+
+ this.backupGroupName = backupGroupName;
+
+ this.backup = backup;
+
+ this.connector = tc;
+
+ this.backupConnector = backupConnector;
+
+ this.scaleDownGroupName = scaleDownGroupName;
+ }
+
+ public NodeAnnounceMessage_V2()
+ {
+ super(NODE_ANNOUNCE_V2);
+ }
+
+ // Public --------------------------------------------------------
+
+ public String getScaleDownGroupName()
+ {
+ return scaleDownGroupName;
+ }
+
+ @Override
+ public void encodeRest(final HornetQBuffer buffer)
+ {
+ super.encodeRest(buffer);
+ buffer.writeNullableString(scaleDownGroupName);
+ }
+
+ @Override
+ public void decodeRest(final HornetQBuffer buffer)
+ {
+ super.decodeRest(buffer);
+ scaleDownGroupName = buffer.readNullableString();
+ }
+
+ @Override
+ public String toString()
+ {
+ return "NodeAnnounceMessage_V2 [scaleDownGroupName=" + scaleDownGroupName +
+ ", toString()=" +
+ super.toString() +
+ "]";
+ }
+
+ @Override
+ public int hashCode()
+ {
+ final int prime = 31;
+ int result = super.hashCode();
+ result = prime * result + ((scaleDownGroupName == null) ? 0 : scaleDownGroupName.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (this == obj)
+ {
+ return true;
+ }
+ if (!super.equals(obj))
+ {
+ return false;
+ }
+ if (!(obj instanceof NodeAnnounceMessage_V2))
+ {
+ return false;
+ }
+ NodeAnnounceMessage_V2 other = (NodeAnnounceMessage_V2) obj;
+ if (scaleDownGroupName == null)
+ {
+ if (other.scaleDownGroupName != null)
+ {
+ return false;
+ }
+ }
+ else if (!scaleDownGroupName.equals(other.scaleDownGroupName))
+ {
+ return false;
+ }
+ return true;
+ }
+}
View
6 hornetq-core-client/src/main/java/org/hornetq/spi/core/protocol/RemotingConnection.java
@@ -16,6 +16,7 @@
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.core.remoting.CloseListener;
import org.hornetq.core.remoting.FailureListener;
import org.hornetq.spi.core.remoting.BufferHandler;
@@ -150,6 +151,11 @@
void disconnect(boolean criticalError);
/**
+ * Disconnect the connection, closing all channels
+ */
+ void disconnect(TransportConfiguration transportConfiguration, boolean criticalError);
+
+ /**
* returns true if any data has been received since the last time this method was called.
*
* @return true if data has been received.
View
2 hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/ClientProtocolManager.java
@@ -57,7 +57,7 @@
void ping(long connectionTTL);
- void sendNodeAnnounce(long currentEventID, String nodeID, String nodeName, boolean backup, TransportConfiguration config, TransportConfiguration backupConfig);
+ void sendNodeAnnounce(long currentEventID, String nodeID, String backupGroupName, String scaleDownGroupName, boolean backup, TransportConfiguration config, TransportConfiguration backupConfig);
SessionContext createSessionContext(final String name,
final String username,
View
11 hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/ProtocolResponseHandler.java
@@ -23,13 +23,14 @@
public interface ProtocolResponseHandler
{
// This is sent when the server is telling the client the node is being disconnected
- void nodeDisconnected(RemotingConnection conn, String nodeID);
+ void nodeDisconnected(RemotingConnection conn, String nodeID);
void notifyNodeUp(long uniqueEventID,
- final String nodeID,
- final String nodeName,
- final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
- final boolean isLast);
+ final String backupGroupName,
+ final String scaleDownGroupName,
+ final String nodeName,
+ final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
+ final boolean isLast);
// This is sent when any node on the cluster topology is going down
void notifyNodeDown(final long eventTime, final String nodeID);
View
2 hornetq-core-client/src/main/resources/hornetq-version.properties
@@ -6,4 +6,4 @@ hornetq.version.incrementingVersion=${hornetq.version.incrementingVersion}
hornetq.version.versionSuffix=${hornetq.version.versionSuffix}
hornetq.version.versionTag=${hornetq.version.versionTag}
hornetq.netty.version=${netty.version.string}
-hornetq.version.compatibleVersionList=121,122,123,124
+hornetq.version.compatibleVersionList=121,122,123,124,125
View
9 ...mqp-protocol/src/main/java/org/hornetq/core/protocol/proton/ProtonRemotingConnection.java
@@ -32,6 +32,7 @@
import org.apache.qpid.proton.engine.impl.TransportImpl;
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.core.protocol.proton.exceptions.HornetQAMQPException;
import org.hornetq.core.remoting.CloseListener;
import org.hornetq.core.remoting.FailureListener;
@@ -262,7 +263,13 @@ public boolean isDestroyed()
}
@Override
- public void disconnect(boolean criticalError)
+ public void disconnect(final boolean criticalError)
+ {
+ disconnect(null, criticalError);
+ }
+
+ @Override
+ public void disconnect(final TransportConfiguration tc, final boolean criticalError)
{
destroy();
}
View
7 ...hornetq-stomp-protocol/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java
@@ -23,6 +23,7 @@
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQBuffers;
import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.core.protocol.stomp.v10.StompFrameHandlerV10;
import org.hornetq.core.protocol.stomp.v12.StompFrameHandlerV12;
@@ -624,6 +625,12 @@ protected void sendServerMessage(ServerMessageImpl message, String txID) throws
@Override
public void disconnect(final boolean criticalError)
{
+ disconnect(null, criticalError);
+ }
+
+ @Override
+ public void disconnect(final TransportConfiguration tc, final boolean criticalError)
+ {
destroy();
}
View
42 hornetq-server/src/main/java/org/hornetq/core/config/Configuration.java
@@ -58,6 +58,20 @@
void setBackupGroupName(String nodeGroupName);
/**
+ * returns the name used to group
+ *
+ * @return the name of the group
+ */
+ String getScaleDownGroupName();
+
+ /**
+ * Used to configure groups of live/backup servers.
+ *
+ * @param nodeGroupName the node group name
+ */
+ void setScaleDownGroupName(String nodeGroupName);
+
+ /**
* Returns whether this server is clustered. <br>
* {@code true} if {@link #getClusterConfigurations()} is not empty.
*/
@@ -447,6 +461,19 @@
void setFailoverOnServerShutdown(boolean failoverOnServerShutdown);
/**
+ * Should we scaleDown our messages when the server is shutdown cleanly.
+ *
+ * @return true if server should scaleDown its messages on clean shutdown
+ * @see #setScaleDown(boolean)
+ */
+ boolean isScaleDown();
+
+ /**
+ * Sets whether to allow the server to scaleDown its messages on server shutdown.
+ */
+ void setScaleDown(boolean scaleDown);
+
+ /**
* Sets the cluster password for this server.
*/
void setClusterPassword(String password);
@@ -933,6 +960,21 @@
*/
String getReplicationClustername();
+ /**
+ * Name of the cluster configuration to use for scaling down.
+ * <p/>
+ * Only applicable for servers with more than one cluster configuration.
+ *
+ * @param clusterName
+ */
+ void setScaleDownClustername(String clusterName);
+
+ /**
+ * @return name of the cluster configuration to use
+ * @see #setScaleDownClustername(String)
+ */
+ String getScaleDownClustername();
+
/*
* Whether or not that HornetQ should use all protocols available on the classpath. If false only the core protocol will
* be set, any other protocols will need to be set directly on the HornetQServer
View
19 hornetq-server/src/main/java/org/hornetq/core/config/ConfigurationUtils.java
@@ -13,6 +13,7 @@
package org.hornetq.core.config;
import org.hornetq.api.core.HornetQIllegalStateException;
+import org.hornetq.core.server.HornetQServerLogger;
public final class ConfigurationUtils
{
@@ -32,6 +33,22 @@ public static ClusterConnectionConfiguration getReplicationClusterConfiguration(
if (replicationCluster.equals(clusterConf.getName()))
return clusterConf;
}
- throw new HornetQIllegalStateException("Missing cluster-configuration for replication-cluster-name '" + replicationCluster + "'.");
+ throw new HornetQIllegalStateException("Missing cluster-configuration for replication-clustername '" + replicationCluster + "'.");
+ }
+
+ public static ClusterConnectionConfiguration getScaleDownClusterConfiguration(Configuration conf)
+ {
+ final String scaleDownClustername = conf.getScaleDownClustername();
+ if (scaleDownClustername == null || scaleDownClustername.isEmpty())
+ return conf.getClusterConfigurations().get(0);
+ for (ClusterConnectionConfiguration clusterConf : conf.getClusterConfigurations())
+ {
+ if (scaleDownClustername.equals(clusterConf.getName()))
+ return clusterConf;
+ }
+
+ HornetQServerLogger.LOGGER.missingClusterConfigForScaleDown(scaleDownClustername);
+
+ return null;
}
}
View
68 hornetq-server/src/main/java/org/hornetq/core/config/impl/ConfigurationImpl.java
@@ -56,7 +56,9 @@
private String name = "ConfigurationImpl::" + System.identityHashCode(this);
- private String nodeGroupName = null;
+ private String backupGroupName = null;
+
+ private String scaleDownGroupName = null;
protected boolean backup = HornetQDefaultConfiguration.isDefaultBackup();
@@ -193,6 +195,8 @@
protected boolean failoverOnServerShutdown = HornetQDefaultConfiguration.isDefaultFailoverOnServerShutdown();
+ protected boolean scaleDown = HornetQDefaultConfiguration.isDefaultScaleDown();
+
// percentage of free memory which triggers warning from the memory manager
private int memoryWarningThreshold = HornetQDefaultConfiguration.getDefaultMemoryWarningThreshold();
@@ -216,6 +220,8 @@
private String replicationClusterName;
+ private String scaleDownClusterName;
+
private boolean resolveProtocols = HornetQDefaultConfiguration.isDefaultResolveProtocols();
private int maxSavedReplicatedJournalsSize = HornetQDefaultConfiguration.getDefaultMaxSavedReplicatedJournalsSize();
@@ -805,6 +811,16 @@ public void setFailoverOnServerShutdown(boolean failoverOnServerShutdown)
this.failoverOnServerShutdown = failoverOnServerShutdown;
}
+ public boolean isScaleDown()
+ {
+ return scaleDown;
+ }
+
+ public void setScaleDown(boolean scaleDown)
+ {
+ this.scaleDown = scaleDown;
+ }
+
public void setClusterPassword(final String theclusterPassword)
{
clusterPassword = theclusterPassword;
@@ -988,12 +1004,22 @@ public void setName(String name)
public String getBackupGroupName()
{
- return nodeGroupName;
+ return backupGroupName;
}
public void setBackupGroupName(String nodeGroupName)
{
- this.nodeGroupName = nodeGroupName;
+ this.backupGroupName = nodeGroupName;
+ }
+
+ public String getScaleDownGroupName()
+ {
+ return scaleDownGroupName;
+ }
+
+ public void setScaleDownGroupName(String nodeGroupName)
+ {
+ this.scaleDownGroupName = nodeGroupName;
}
@Override
@@ -1044,6 +1070,18 @@ public String getReplicationClustername()
}
@Override
+ public void setScaleDownClustername(String clusterName)
+ {
+ this.scaleDownClusterName = clusterName;
+ }
+
+ @Override
+ public String getScaleDownClustername()
+ {
+ return scaleDownClusterName;
+ }
+
+ @Override
public void setMaxSavedReplicatedJournalSize(int maxSavedReplicatedJournalsSize)
{
this.maxSavedReplicatedJournalsSize = maxSavedReplicatedJournalsSize;
@@ -1135,7 +1173,8 @@ public int hashCode()
result = prime * result + (int)(messageExpiryScanPeriod ^ (messageExpiryScanPeriod >>> 32));
result = prime * result + messageExpiryThreadPriority;
result = prime * result + ((name == null) ? 0 : name.hashCode());
- result = prime * result + ((nodeGroupName == null) ? 0 : nodeGroupName.hashCode());
+ result = prime * result + ((backupGroupName == null) ? 0 : backupGroupName.hashCode());
+ result = prime * result + ((scaleDownGroupName == null) ? 0 : scaleDownGroupName.hashCode());
result =
prime * result +
((outgoingInterceptorClassNames == null) ? 0 : outgoingInterceptorClassNames.hashCode());
@@ -1145,6 +1184,7 @@ public int hashCode()
result = prime * result + (persistenceEnabled ? 1231 : 1237);
result = prime * result + ((queueConfigurations == null) ? 0 : queueConfigurations.hashCode());
result = prime * result + ((replicationClusterName == null) ? 0 : replicationClusterName.hashCode());
+ result = prime * result + ((scaleDownClusterName == null) ? 0 : scaleDownClusterName.hashCode());
result = prime * result + (runSyncSpeedTest ? 1231 : 1237);
result = prime * result + scheduledThreadPoolMaxSize;
result = prime * result + (securityEnabled ? 1231 : 1237);
@@ -1384,12 +1424,19 @@ else if (!managementNotificationAddress.equals(other.managementNotificationAddre
}
else if (!name.equals(other.name))
return false;
- if (nodeGroupName == null)
+ if (backupGroupName == null)
{
- if (other.nodeGroupName != null)
+ if (other.backupGroupName != null)
return false;
}
- else if (!nodeGroupName.equals(other.nodeGroupName))
+ else if (!backupGroupName.equals(other.backupGroupName))
+ return false;
+ if (scaleDownGroupName == null)
+ {
+ if (other.scaleDownGroupName != null)
+ return false;
+ }
+ else if (!scaleDownGroupName.equals(other.scaleDownGroupName))
return false;
if (outgoingInterceptorClassNames == null)
{
@@ -1425,6 +1472,13 @@ else if (!queueConfigurations.equals(other.queueConfigurations))
}
else if (!replicationClusterName.equals(other.replicationClusterName))
return false;
+ if (scaleDownClusterName == null)
+ {
+ if (other.scaleDownClusterName != null)
+ return false;
+ }
+ else if (!scaleDownClusterName.equals(other.scaleDownClusterName))
+ return false;
if (runSyncSpeedTest != other.runSyncSpeedTest)
return false;
if (scheduledThreadPoolMaxSize != other.scheduledThreadPoolMaxSize)
View
6 hornetq-server/src/main/java/org/hornetq/core/deployers/impl/FileConfigurationParser.java
@@ -188,11 +188,17 @@ public void parseMainConfig(final Element e, final Configuration config) throws
config.setBackupGroupName(getString(e, "backup-group-name", config.getBackupGroupName(),
Validators.NO_CHECK));
+ config.setScaleDownGroupName(getString(e, "scale-down-group-name", config.getScaleDownGroupName(),
+ Validators.NO_CHECK));
+
config.setFailbackDelay(getLong(e, "failback-delay", config.getFailbackDelay(), Validators.GT_ZERO));
config.setFailoverOnServerShutdown(getBoolean(e, "failover-on-shutdown",
config.isFailoverOnServerShutdown()));
+ config.setScaleDown(getBoolean(e, "scale-down",
+ config.isScaleDown()));
config.setReplicationClustername(getString(e, "replication-clustername", null, Validators.NO_CHECK));
+ config.setScaleDownClustername(getString(e, "scale-down-clustername", null, Validators.NO_CHECK));
config.setResolveProtocols(getBoolean(e, "resolve-protocols", config.isResolveProtocols()));
View
31 hornetq-server/src/main/java/org/hornetq/core/management/impl/HornetQServerControlImpl.java
@@ -306,6 +306,37 @@ public boolean isFailoverOnServerShutdown()
}
}
+ public void setScaleDown(boolean scaleDown)
+ {
+ checkStarted();
+
+ clearIO();
+ try
+ {
+ configuration.setScaleDown(scaleDown);
+ }
+ finally
+ {
+ blockOnIO();
+ }
+ }
+
+
+ public boolean isScaleDown()
+ {
+ checkStarted();
+
+ clearIO();
+ try
+ {
+ return configuration.isScaleDown();
+ }
+ finally
+ {
+ blockOnIO();
+ }
+ }
+
public int getJournalMaxIO()
{
checkStarted();
View
2 hornetq-server/src/main/java/org/hornetq/core/paging/cursor/PageSubscription.java
@@ -71,6 +71,8 @@
void ack(PagedReference ref) throws Exception;
+ boolean contains(PagedReference ref) throws Exception;
+
// for internal (cursor) classes
void confirmPosition(PagePosition ref) throws Exception;
View
31 hornetq-server/src/main/java/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
@@ -492,6 +492,30 @@ public void ack(final PagedReference reference) throws Exception
tx.commit();
}
+ public boolean contains(PagedReference ref) throws Exception
+ {
+ // We first verify if the message was routed to this queue
+ boolean routed = false;
+
+ for (long idRef : ref.getPagedMessage().getQueueIDs())
+ {
+ if (idRef == this.cursorId)
+ {
+ routed = true;
+ break;
+ }
+ }
+ if (!routed)
+ {
+ return false;
+ }
+ else
+ {
+ // if it's been routed here, we have to verify if it was acked
+ return !getPageInfo(ref.getPosition()).isAck(ref.getPosition());
+ }
+ }
+
public void confirmPosition(final PagePosition position) throws Exception
{
// if we are dealing with a persistent cursor
@@ -1009,6 +1033,13 @@ private void onPageDone(final PageCursorInfo info)
// expressions
private final AtomicInteger confirmed = new AtomicInteger(0);
+
+ public boolean isAck(PagePosition position)
+ {
+ return completePage != null ||
+ acks.contains(position);
+ }
+
@Override
public String toString()
{
View
4 hornetq-server/src/main/java/org/hornetq/core/postoffice/PostOffice.java
@@ -12,6 +12,8 @@
*/
package org.hornetq.core.postoffice;
+import java.util.Map;
+
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.server.HornetQComponent;
@@ -60,6 +62,8 @@
Bindings getMatchingBindings(SimpleString address) throws Exception;
+ Map<SimpleString, Binding> getAllBindings();
+
void route(ServerMessage message, boolean direct) throws Exception;
void route(ServerMessage message, Transaction tx, boolean direct) throws Exception;
View
5 hornetq-server/src/main/java/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
@@ -577,6 +577,11 @@ public Bindings getMatchingBindings(final SimpleString address) throws Exception
return addressManager.getMatchingBindings(address);
}
+ public Map<SimpleString, Binding> getAllBindings()
+ {
+ return addressManager.getBindings();
+ }
+
public void route(final ServerMessage message, final boolean direct) throws Exception
{
route(message, (Transaction) null, direct);
View
22 hornetq-server/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
@@ -41,7 +41,9 @@
import org.hornetq.core.protocol.core.impl.wireformat.BackupReplicationStartFailedMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage_V2;
+import org.hornetq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage_V3;
import org.hornetq.core.protocol.core.impl.wireformat.NodeAnnounceMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.NodeAnnounceMessage_V2;
import org.hornetq.core.protocol.core.impl.wireformat.Ping;
import org.hornetq.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessageV2;
@@ -233,7 +235,14 @@ public void nodeUP(final TopologyMember topologyMember, final boolean last)
{
public void run()
{
- if (channel0.supports(PacketImpl.CLUSTER_TOPOLOGY_V2))
+ if (channel0.supports(PacketImpl.CLUSTER_TOPOLOGY_V3))
+ {
+ channel0.send(new ClusterTopologyChangeMessage_V3(topologyMember.getUniqueEventID(),
+ nodeID, topologyMember.getBackupGroupName(),
+ topologyMember.getScaleDownGroupName(),
+ connectorPair, last));
+ }
+ else if (channel0.supports(PacketImpl.CLUSTER_TOPOLOGY_V2))
{
channel0.send(new ClusterTopologyChangeMessage_V2(topologyMember.getUniqueEventID(),
nodeID, topologyMember.getBackupGroupName(),
@@ -314,7 +323,7 @@ public void run()
Pair<TransportConfiguration, TransportConfiguration> emptyConfig = new Pair<TransportConfiguration, TransportConfiguration>(null, null);
if (channel0.supports(PacketImpl.CLUSTER_TOPOLOGY_V2))
{
- channel0.send(new ClusterTopologyChangeMessage_V2(System.currentTimeMillis(), nodeId, server.getConfiguration().getName(), emptyConfig, true));
+ channel0.send(new ClusterTopologyChangeMessage_V2(System.currentTimeMillis(), nodeId, null, emptyConfig, true));
}
else
{
@@ -325,7 +334,7 @@ public void run()
}
}
- else if (packet.getType() == PacketImpl.NODE_ANNOUNCE)
+ else if (packet.getType() == PacketImpl.NODE_ANNOUNCE || packet.getType() == PacketImpl.NODE_ANNOUNCE_V2)
{
NodeAnnounceMessage msg = (NodeAnnounceMessage)packet;
@@ -348,7 +357,12 @@ else if (packet.getType() == PacketImpl.NODE_ANNOUNCE)
ClusterConnection clusterConn = acceptorUsed.getClusterConnection();
if (clusterConn != null)
{
- clusterConn.nodeAnnounced(msg.getCurrentEventID(), msg.getNodeID(), msg.getNodeName(), pair, msg.isBackup());
+ String scaleDownGroupName = null;
+ if (packet.getType() == PacketImpl.NODE_ANNOUNCE_V2)
+ {
+ scaleDownGroupName = ((NodeAnnounceMessage_V2)msg).getScaleDownGroupName();
+ }
+ clusterConn.nodeAnnounced(msg.getCurrentEventID(), msg.getNodeID(), msg.getBackupGroupName(), scaleDownGroupName, pair, msg.isBackup());
}
else
{
View
15 hornetq-server/src/main/java/org/hornetq/core/remoting/server/RemotingService.java
@@ -15,6 +15,7 @@
import java.util.Set;
import org.hornetq.api.core.Interceptor;
+import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.core.protocol.core.CoreRemotingConnection;
import org.hornetq.core.security.HornetQPrincipal;
import org.hornetq.spi.core.protocol.RemotingConnection;
@@ -31,6 +32,7 @@
* Remove a connection from the connections held by the remoting service.
* <strong>This method must be used only from the management API.
* RemotingConnections are removed from the remoting service when their connectionTTL is hit.</strong>
+ *
* @param remotingConnectionID the ID of the RemotingConnection to removed
* @return the removed RemotingConnection
*/
@@ -56,19 +58,22 @@
* Allow acceptors to use this as their default security Principal if applicable.
* <p>
* Used by AS7 integration code.
+ *
* @param principal
*/
void allowInvmSecurityOverride(HornetQPrincipal principal);
/**
- * Freezes and then disconnects all connections except the given one.
- * @param backupTransportConnection
+ * Freezes and then disconnects all connections except the given one and tells the client where else
+ * it might connect (only applicable if server is in a cluster and uses scaleDown-on-failover=true).
+ *
+ * @param transportConfiguration
+ * @param remotingConnection
*/
- void freeze(CoreRemotingConnection rc);
+ void freeze(TransportConfiguration transportConfiguration, CoreRemotingConnection remotingConnection);
/**
- * Returns the acceptor identified by its {@code
- * name} or {@code null} if it does not exists.
+ * Returns the acceptor identified by its {@code name} or {@code null} if it does not exists.
*
* @param name the name of the acceptor
*/
View
5 hornetq-server/src/main/java/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
@@ -341,7 +341,7 @@ public synchronized void allowInvmSecurityOverride(HornetQPrincipal principal)
}
}
- public synchronized void freeze(final CoreRemotingConnection connectionToKeepOpen)
+ public synchronized void freeze(final TransportConfiguration transportConfiguration, final CoreRemotingConnection connectionToKeepOpen)
{
if (!started)
return;
@@ -376,11 +376,10 @@ public synchronized void freeze(final CoreRemotingConnection connectionToKeepOpe
if (!conn.isClient())
{
- conn.disconnect(false);
+ conn.disconnect(transportConfiguration, false);
connections.remove(entry.getKey());