Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

retagging 2.2.8

  • Loading branch information...
commit f07d9916af09861466e760945cd8041cc82a5049 2 parents 5a588f2 + 8585fbb
@clebertsuconic clebertsuconic authored
Showing with 1,329 additions and 186 deletions.
  1. +1 −1  .classpath
  2. +10 −6 build-maven.xml
  3. +10 −0 docs/user-manual/en/clusters.xml
  4. +2 −1  src/config/jboss-as-4/clustered/hornetq-configuration.xml
  5. +2 −1  src/config/jboss-as-4/non-clustered/hornetq-configuration.xml
  6. +2 −1  src/config/jboss-as-5/clustered/hornetq-configuration.xml
  7. +2 −1  src/config/jboss-as-5/non-clustered/hornetq-configuration.xml
  8. +2 −1  src/config/jboss-as-6/clustered/hornetq-configuration.xml
  9. +2 −1  src/config/jboss-as-6/non-clustered/hornetq-configuration.xml
  10. +5 −5 src/config/ra.xml
  11. +4 −0 src/main/org/hornetq/api/core/management/ClusterConnectionControl.java
  12. +4 −0 src/main/org/hornetq/api/jms/management/JMSQueueControl.java
  13. +6 −2 src/main/org/hornetq/api/jms/management/TopicControl.java
  14. +48 −26 src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
  15. +16 −7 src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
  16. +75 −32 src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
  17. +2 −1  src/main/org/hornetq/core/client/impl/ClientSessionInternal.java
  18. +5 −0 src/main/org/hornetq/core/client/impl/DelegatingSession.java
  19. +36 −13 src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
  20. +5 −0 src/main/org/hornetq/core/client/impl/Topology.java
  21. +26 −0 src/main/org/hornetq/core/config/BridgeConfiguration.java
  22. +12 −0 src/main/org/hornetq/core/management/impl/ClusterConnectionControlImpl.java
  23. +9 −1 src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
  24. +9 −0 src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
  25. +6 −0 src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
  26. +24 −6 src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
  27. +4 −0 src/main/org/hornetq/core/server/impl/QueueImpl.java
  28. +65 −20 src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
  29. +25 −13 src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
  30. +9 −1 src/main/org/hornetq/integration/spring/SpringBindingRegistry.java
  31. +5 −0 src/main/org/hornetq/jms/management/impl/JMSQueueControlImpl.java
  32. +10 −0 src/main/org/hornetq/jms/management/impl/JMSTopicControlImpl.java
  33. +24 −1 src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
  34. +2 −2 src/main/org/hornetq/ra/HornetQRAConnectionMetaData.java
  35. +7 −0 src/main/org/hornetq/ra/inflow/HornetQActivation.java
  36. +39 −4 tests/src/org/hornetq/tests/integration/InterceptorTest.java
  37. +110 −1 tests/src/org/hornetq/tests/integration/client/AcknowledgeTest.java
  38. +3 −0  tests/src/org/hornetq/tests/integration/client/InterruptedLargeMessageTest.java
  39. +7 −0 tests/src/org/hornetq/tests/integration/client/JMSPagingFileDeleteTest.java
  40. +1 −1  tests/src/org/hornetq/tests/integration/client/PagingTest.java
  41. +20 −24 tests/src/org/hornetq/tests/integration/client/RedeliveryConsumerTest.java
  42. +1 −1  tests/src/org/hornetq/tests/integration/clientcrash/ClientCrashTest.java
  43. +221 −0 tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
  44. +10 −1 tests/src/org/hornetq/tests/integration/cluster/distribution/OneWayChainClusterTest.java
  45. +357 −1 tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
  46. +6 −0 tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java
  47. +6 −0 tests/src/org/hornetq/tests/integration/cluster/util/InVMNodeManager.java
  48. +11 −4 tests/src/org/hornetq/tests/integration/jms/client/StoreConfigTest.java
  49. +19 −1 tests/src/org/hornetq/tests/integration/jms/server/management/JMSQueueControlTest.java
  50. +6 −0 tests/src/org/hornetq/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java
  51. +5 −0 tests/src/org/hornetq/tests/integration/management/ClusterConnectionControlUsingCoreTest.java
  52. +3 −2 tests/src/org/hornetq/tests/integration/scheduling/ScheduledMessageTest.java
  53. +5 −0 tests/src/org/hornetq/tests/integration/spring/ExampleListener.java
  54. +6 −1 tests/src/org/hornetq/tests/integration/spring/SpringIntegrationTest.java
  55. +10 −2 tests/src/org/hornetq/tests/unit/core/deployers/impl/FileDeploymentManagerTest.java
  56. +7 −0 tests/src/org/hornetq/tests/util/UnitTestCase.java
View
2  .classpath
@@ -113,7 +113,7 @@
<classpathentry kind="lib" path="thirdparty/org/jboss/integration/lib/jboss-transaction-spi.jar"/>
<classpathentry kind="lib" path="thirdparty/org/jboss/javaee/lib/jboss-jaspi-api.jar"/>
<classpathentry kind="lib" path="thirdparty/org/jboss/javaee/lib/jboss-jca-api.jar"/>
- <classpathentry kind="lib" path="thirdparty/org/jboss/javaee/lib/jboss-jms-api.jar"/>
+ <classpathentry kind="lib" path="thirdparty/org/jboss/javaee/lib/jboss-jms-api.jar" sourcepath="/Users/clebertsuconic/.m2/repository/org/jboss/javaee/jboss-jms-api/1.1.0.GA/jboss-jms-api-1.1.0.GA-sources.jar"/>
<classpathentry kind="lib" path="thirdparty/org/jboss/javaee/lib/jboss-transaction-api.jar"/>
<classpathentry kind="lib" path="thirdparty/org/jboss/lib/jboss-common-core.jar"/>
<classpathentry kind="lib" path="thirdparty/org/jboss/lib/jboss-mdr.jar"/>
View
16 build-maven.xml
@@ -17,6 +17,10 @@
<property name="build.dir" value="build"/>
<property name="jars.dir" value="${build.dir}/jars"/>
+ <condition property="maven.executable" value="mvn.bat" else="mvn">
+ <os family="windows"/>
+ </condition>
+
<target name="uploadHornetQBootstrap">
<antcall target="upload">
<param name="artifact.id" value="hornetq-bootstrap"/>
@@ -162,7 +166,7 @@
<target name="upload-local-target">
<!-- install the jar -->
- <exec executable="mvn" dir="./build">
+ <exec executable="${maven.executable}" dir="./build">
<arg value="install:install-file"/>
<!-- uncomment the following line to deploy to the JBoss 5 repository -->
<!-- <arg value="-Dmaven.repo.local=/work/eap-51/maven-repository"/> -->
@@ -173,7 +177,7 @@
<arg value="-Dfile=./jars/${file-name}.jar"/>
</exec>
<!-- install the sources jar -->
- <exec executable="mvn" dir="./build">
+ <exec executable="${maven.executable}" dir="./build">
<arg value="install:install-file"/>
<arg value="-DgroupId=org.hornetq"/>
<!-- uncomment the following line to deploy to the JBoss 5 repository -->
@@ -188,7 +192,7 @@
<target name="upload">
<!-- upload the jar -->
- <exec executable="mvn">
+ <exec executable="${maven.executable}">
<arg value="deploy:deploy-file"/>
<arg value="-e"/>
<arg value="-DgroupId=org.hornetq"/>
@@ -201,7 +205,7 @@
<arg value="-Durl=dav:https://snapshots.jboss.org/maven2"/>
</exec>
<!-- upload the corresponding sources jar -->
- <exec executable="mvn">
+ <exec executable="${maven.executable}">
<arg value="deploy:deploy-file"/>
<arg value="-e"/>
<arg value="-DgroupId=org.hornetq"/>
@@ -283,7 +287,7 @@
&lt;version&gt;${hornetq.version}&lt;/version&gt;${line.separator}&lt;/project&gt;"/>
<!-- deploy the jar -->
- <exec executable="mvn">
+ <exec executable="${maven.executable}">
<arg value="-e"/>
<arg value="deploy:deploy-file"/>
<arg value="-DpomFile=${temporary.pom}"/>
@@ -296,7 +300,7 @@
<arg value="-Durl=https://repository.jboss.org/nexus/service/local/staging/deploy/maven2/"/>
</exec>
<!-- deploy the sources jar -->
- <exec executable="mvn">
+ <exec executable="${maven.executable}">
<arg value="-e"/>
<arg value="deploy:deploy-file"/>
<arg value="-DpomFile=${temporary.pom}"/>
View
10 docs/user-manual/en/clusters.xml
@@ -511,6 +511,16 @@ ClientSession session = factory.createSession();
>false</literal>.</para>
</listitem>
<listitem>
+ <para><literal>min-large-message-size</literal>. This parameters determines when a
+ message should be splitted with multiple packages when sent over the cluster.</para>
+ <para>This parameter is optional and its default is at 100K.</para>
+ </listitem>
+ <listitem>
+ <para><literal>reconnect-attempts"</literal>.The number of times the system will
+ try to connect a node on the cluster. If the max-retry is achieved this node will be considered permanently down and the system will stop routing messages to this node.</para>
+ <para>This parameter is optional and its default is at -1 (infinite retries).</para>
+ </listitem>
+ <listitem>
<para><literal>max-hops</literal>. When a cluster connection decides the set of
nodes to which it might load balance a message, those nodes do not have to
be directly connected to it via a cluster connection. HornetQ can be
View
3  src/config/jboss-as-4/clustered/hornetq-configuration.xml
@@ -116,10 +116,11 @@
<address-setting match="#">
<dead-letter-address>jms.queue.DLQ</dead-letter-address>
<expiry-address>jms.queue.ExpiryQueue</expiry-address>
- <redelivery-delay>60000</redelivery-delay>
+ <redelivery-delay>0</redelivery-delay>
<max-size-bytes>10485760</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>BLOCK</address-full-policy>
+ <redistribution-delay>60000</redistribution-delay>
</address-setting>
</address-settings>
View
3  src/config/jboss-as-4/non-clustered/hornetq-configuration.xml
@@ -89,10 +89,11 @@
<address-setting match="#">
<dead-letter-address>jms.queue.DLQ</dead-letter-address>
<expiry-address>jms.queue.ExpiryQueue</expiry-address>
- <redelivery-delay>60000</redelivery-delay>
+ <redelivery-delay>0</redelivery-delay>
<max-size-bytes>10485760</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>BLOCK</address-full-policy>
+ <redistribution-delay>60000</redistribution-delay>
</address-setting>
</address-settings>
View
3  src/config/jboss-as-5/clustered/hornetq-configuration.xml
@@ -116,10 +116,11 @@
<address-setting match="#">
<dead-letter-address>jms.queue.DLQ</dead-letter-address>
<expiry-address>jms.queue.ExpiryQueue</expiry-address>
- <redelivery-delay>60000</redelivery-delay>
+ <redelivery-delay>0</redelivery-delay>
<max-size-bytes>10485760</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>BLOCK</address-full-policy>
+ <redistribution-delay>60000</redistribution-delay>
</address-setting>
</address-settings>
View
3  src/config/jboss-as-5/non-clustered/hornetq-configuration.xml
@@ -89,10 +89,11 @@
<address-setting match="#">
<dead-letter-address>jms.queue.DLQ</dead-letter-address>
<expiry-address>jms.queue.ExpiryQueue</expiry-address>
- <redelivery-delay>60000</redelivery-delay>
+ <redelivery-delay>0</redelivery-delay>
<max-size-bytes>10485760</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>BLOCK</address-full-policy>
+ <redistribution-delay>60000</redistribution-delay>
</address-setting>
</address-settings>
View
3  src/config/jboss-as-6/clustered/hornetq-configuration.xml
@@ -116,10 +116,11 @@
<address-setting match="#">
<dead-letter-address>jms.queue.DLQ</dead-letter-address>
<expiry-address>jms.queue.ExpiryQueue</expiry-address>
- <redelivery-delay>60000</redelivery-delay>
+ <redelivery-delay>0</redelivery-delay>
<max-size-bytes>10485760</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>BLOCK</address-full-policy>
+ <redistribution-delay>60000</redistribution-delay>
</address-setting>
</address-settings>
View
3  src/config/jboss-as-6/non-clustered/hornetq-configuration.xml
@@ -89,10 +89,11 @@
<address-setting match="#">
<dead-letter-address>jms.queue.DLQ</dead-letter-address>
<expiry-address>jms.queue.ExpiryQueue</expiry-address>
- <redelivery-delay>60000</redelivery-delay>
+ <redelivery-delay>0</redelivery-delay>
<max-size-bytes>10485760</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>BLOCK</address-full-policy>
+ <redistribution-delay>60000</redistribution-delay>
</address-setting>
</address-settings>
View
10 src/config/ra.xml
@@ -8,8 +8,8 @@
http://java.sun.com/xml/ns/j2ee/connector_1_5.xsd"
version="1.5">
- <description>HornetQ 2.0 Resource Adapter</description>
- <display-name>HornetQ 2.0 Resource Adapter</display-name>
+ <description>HornetQ 2.2 Resource Adapter</description>
+ <display-name>HornetQ 2.2 Resource Adapter</display-name>
<vendor-name>Red Hat Middleware LLC</vendor-name>
<eis-type>JMS 1.1 Server</eis-type>
@@ -246,19 +246,19 @@ Copyright 2009 Red Hat, Inc.
<config-property>
<description>whether to use jndi for looking up destinations etc</description>
<config-property-name>UseJNDI</config-property-name>
- <config-property-type>boolean</config-property-type>
+ <config-property-type>java.lang.Boolean</config-property-type>
<config-property-value></config-property-value>
</config-property>
<config-property>
<description>how long in milliseconds to wait before retry on failed MDB setup</description>
<config-property-name>SetupInterval</config-property-name>
- <config-property-type>long</config-property-type>
+ <config-property-type>java.lang.Long</config-property-type>
<config-property-value></config-property-value>
</config-property>
<config-property>
<description>How many attempts should be made when connecting the MDB</description>
<config-property-name>SetupAttempts</config-property-name>
- <config-property-type>int</config-property-type>
+ <config-property-type>java.lang.Integer</config-property-type>
<config-property-value></config-property-value>
</config-property>-->
View
4 src/main/org/hornetq/api/core/management/ClusterConnectionControl.java
@@ -49,6 +49,10 @@
boolean isForwardWhenNoConsumers();
/**
+ * Return the Topology that this Cluster Connection knows about
+ */
+ String getTopology();
+ /**
* Returns the maximum number of hops used by this cluster connection.
*/
int getMaxHops();
View
4 src/main/org/hornetq/api/jms/management/JMSQueueControl.java
@@ -77,6 +77,10 @@
*/
@Operation(desc = "Adds the queue to another JNDI binding")
void addJNDI(@Parameter(name = "jndiBinding", desc = "the name of the binding for JNDI") String jndi) throws Exception;
+
+ @Operation(desc = "Adds the queue to another JNDI binding")
+ void removeJNDI(@Parameter(name = "jndiBinding", desc = "the name of the binding for JNDI") String jndi) throws Exception;
+
/**
* Lists all the JMS messages in this queue matching the specified filter.
View
8 src/main/org/hornetq/api/jms/management/TopicControl.java
@@ -13,7 +13,6 @@
package org.hornetq.api.jms.management;
-import java.util.List;
import java.util.Map;
import javax.management.MBeanOperationInfo;
@@ -64,8 +63,13 @@
/**
* Add the JNDI binding to this destination
*/
- @Operation(desc = "Adds the queue to another JNDI binding")
+ @Operation(desc = "Add the queue to another JNDI binding")
void addJNDI(@Parameter(name = "jndiBinding", desc = "the name of the binding for JNDI") String jndi) throws Exception;
+
+ @Operation(desc = "Add the queue to another JNDI binding")
+ void removeJNDI(@Parameter(name = "jndiBinding", desc = "the name of the binding for JNDI") String jndi) throws Exception;
+
+
View
74 src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
@@ -104,6 +104,8 @@
private volatile boolean closed;
private volatile int creditsToSend;
+
+ private volatile boolean failedOver;
private volatile Exception lastException;
@@ -165,7 +167,7 @@ public ClientConsumerImpl(final ClientSessionInternal session,
// ClientConsumer implementation
// -----------------------------------------------------------------
- private ClientMessage receive(long timeout, final boolean forcingDelivery) throws HornetQException
+ private ClientMessage receive(final long timeout, final boolean forcingDelivery) throws HornetQException
{
checkClosed();
@@ -194,17 +196,14 @@ private ClientMessage receive(long timeout, final boolean forcingDelivery) throw
receiverThread = Thread.currentThread();
- if (timeout == 0)
- {
- // Effectively infinite
- timeout = Long.MAX_VALUE;
- }
-
+ // To verify if deliveryForced was already call
boolean deliveryForced = false;
+ // To control when to call deliveryForce
+ boolean callForceDelivery = false;
long start = -1;
- long toWait = timeout;
+ long toWait = timeout == 0 ? Long.MAX_VALUE : timeout;
try
{
@@ -231,13 +230,8 @@ private ClientMessage receive(long timeout, final boolean forcingDelivery) throw
// we only force delivery once per call to receive
if (!deliveryForced)
{
- if (isTrace)
- {
- log.trace("Forcing delivery");
- }
- session.forceDelivery(id, forceDeliveryCount++);
-
- deliveryForced = true;
+ callForceDelivery = true;
+ break;
}
}
@@ -262,6 +256,35 @@ private ClientMessage receive(long timeout, final boolean forcingDelivery) throw
}
}
+ if (failedOver)
+ {
+ if (m == null)
+ {
+ // if failed over and the buffer is null, we reset the state and try it again
+ failedOver = false;
+ deliveryForced = false;
+ toWait = timeout == 0 ? Long.MAX_VALUE : timeout;
+ continue;
+ }
+ else
+ {
+ failedOver = false;
+ }
+ }
+
+ if (callForceDelivery)
+ {
+ if (isTrace)
+ {
+ log.trace("Forcing delivery");
+ }
+ // JBPAPP-6030 - Calling forceDelivery outside of the lock to avoid distributed dead locks
+ session.forceDelivery(id, forceDeliveryCount++);
+ callForceDelivery = false;
+ deliveryForced = true;
+ continue;
+ }
+
if (m != null)
{
session.workDone();
@@ -351,19 +374,14 @@ private ClientMessage receive(long timeout, final boolean forcingDelivery) throw
public ClientMessage receive(final long timeout) throws HornetQException
{
- if (isBrowseOnly())
- {
- ClientMessage msg = receive(timeout, false);
- if (msg == null)
- {
- msg = receive(0, true);
- }
- return msg;
- }
- else
+ ClientMessage msg = receive(timeout, false);
+
+ if (msg == null && !closed)
{
- return receive(timeout, false);
+ msg = receive(0, true);
}
+
+ return msg;
}
public ClientMessage receive() throws HornetQException
@@ -465,6 +483,8 @@ public void clearAtFailover()
lastAckedMessage = null;
creditsToSend = 0;
+
+ failedOver = true;
ackIndividually = false;
}
@@ -887,6 +907,8 @@ private void callOnMessage() throws Exception
{
rateLimiter.limit();
}
+
+ failedOver = false;
synchronized (this)
{
View
23 src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
@@ -921,6 +921,17 @@ private void callFailureListeners(final HornetQException me, final boolean after
*/
private void reconnectSessions(final CoreRemotingConnection oldConnection, final int reconnectAttempts)
{
+ HashSet<ClientSessionInternal> sessionsToFailover;
+ synchronized (sessions)
+ {
+ sessionsToFailover = new HashSet<ClientSessionInternal>(sessions);
+ }
+
+ for (ClientSessionInternal session : sessionsToFailover)
+ {
+ session.preHandleFailover(connection);
+ }
+
getConnectionWithRetry(reconnectAttempts);
if (connection == null)
@@ -946,12 +957,6 @@ private void reconnectSessions(final CoreRemotingConnection oldConnection, final
connection.setFailureListeners(newListeners);
- HashSet<ClientSessionInternal> sessionsToFailover;
- synchronized (sessions)
- {
- sessionsToFailover = new HashSet<ClientSessionInternal>(sessions);
- }
-
for (ClientSessionInternal session : sessionsToFailover)
{
session.handleFailover(connection);
@@ -968,7 +973,7 @@ private void getConnectionWithRetry(final int reconnectAttempts)
" multiplier = " +
retryIntervalMultiplier, new Exception("trace"));
}
-
+
long interval = retryInterval;
int count = 0;
@@ -1042,6 +1047,10 @@ else if (reconnectAttempts == 1)
}
else
{
+ if (log.isDebugEnabled())
+ {
+ log.debug("Reconnection successfull");
+ }
return;
}
}
View
107 src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
@@ -398,15 +398,8 @@ public void forceDelivery(final long consumerID, final long sequence) throws Hor
{
checkClosed();
- // JBPAPP-6030 - Using the executor to avoid distributed dead locks
- executor.execute(new Runnable()
- {
- public void run()
- {
- SessionForceConsumerDelivery request = new SessionForceConsumerDelivery(consumerID, sequence);
- channel.send(request);
- }
- });
+ SessionForceConsumerDelivery request = new SessionForceConsumerDelivery(consumerID, sequence);
+ channel.send(request);
}
public ClientConsumer createConsumer(final SimpleString queueName) throws HornetQException
@@ -594,8 +587,9 @@ public void rollback(final boolean isLastMessageAsDelivered) throws HornetQExcep
stop();
}
+
// We need to make sure we don't get any inflight messages
- for (ClientConsumerInternal consumer : consumers.values())
+ for (ClientConsumerInternal consumer : cloneConsumers())
{
consumer.clear(true);
}
@@ -672,7 +666,7 @@ public void start() throws HornetQException
if (!started)
{
- for (ClientConsumerInternal clientConsumerInternal : consumers.values())
+ for (ClientConsumerInternal clientConsumerInternal : cloneConsumers())
{
clientConsumerInternal.start();
}
@@ -694,7 +688,7 @@ public void stop(final boolean waitForOnMessage) throws HornetQException
if (started)
{
- for (ClientConsumerInternal clientConsumerInternal : consumers.values())
+ for (ClientConsumerInternal clientConsumerInternal : cloneConsumers())
{
clientConsumerInternal.stop(waitForOnMessage);
}
@@ -756,7 +750,10 @@ public void acknowledge(final long consumerID, final long messageID) throws Horn
}
checkClosed();
-
+ if (log.isDebugEnabled())
+ {
+ log.debug("client ack messageID = " + messageID);
+ }
SessionAcknowledgeMessage message = new SessionAcknowledgeMessage(consumerID, messageID, blockOnAcknowledge);
if (blockOnAcknowledge)
@@ -816,7 +813,10 @@ public void addConsumer(final ClientConsumerInternal consumer)
public void addProducer(final ClientProducerInternal producer)
{
- producers.add(producer);
+ synchronized (producers)
+ {
+ producers.add(producer);
+ }
}
public void removeConsumer(final ClientConsumerInternal consumer) throws HornetQException
@@ -829,12 +829,15 @@ public void removeConsumer(final ClientConsumerInternal consumer) throws HornetQ
public void removeProducer(final ClientProducerInternal producer)
{
- producers.remove(producer);
+ synchronized (producers)
+ {
+ producers.remove(producer);
+ }
}
public void handleReceiveMessage(final long consumerID, final SessionReceiveMessage message) throws Exception
{
- ClientConsumerInternal consumer = consumers.get(consumerID);
+ ClientConsumerInternal consumer = getConsumer(consumerID);
if (consumer != null)
{
@@ -850,7 +853,7 @@ public void handleReceiveMessage(final long consumerID, final SessionReceiveMess
public void handleReceiveLargeMessage(final long consumerID, final SessionReceiveLargeMessage message) throws Exception
{
- ClientConsumerInternal consumer = consumers.get(consumerID);
+ ClientConsumerInternal consumer = getConsumer(consumerID);
if (consumer != null)
{
@@ -860,7 +863,7 @@ public void handleReceiveLargeMessage(final long consumerID, final SessionReceiv
public void handleReceiveContinuation(final long consumerID, final SessionReceiveContinuationMessage continuation) throws Exception
{
- ClientConsumerInternal consumer = consumers.get(consumerID);
+ ClientConsumerInternal consumer = getConsumer(consumerID);
if (consumer != null)
{
@@ -928,6 +931,14 @@ public void setSendAcknowledgementHandler(final SendAcknowledgementHandler handl
sendAckHandler = handler;
}
+ public void preHandleFailover(CoreRemotingConnection connection)
+ {
+ // We lock the channel to prevent any packets to be added to the resend
+ // cache during the failover process
+ //we also do this before the connection fails over to give the session a chance to block for failover
+ channel.lock();
+ }
+
// Needs to be synchronized to prevent issues with occurring concurrently with close()
public void handleFailover(final CoreRemotingConnection backupConnection)
@@ -941,9 +952,6 @@ public void handleFailover(final CoreRemotingConnection backupConnection)
boolean resetCreditManager = false;
- // We lock the channel to prevent any packets to be added to the resend
- // cache during the failover process
- channel.lock();
try
{
channel.transferConnection(backupConnection);
@@ -1090,7 +1098,7 @@ public void handleFailover(final CoreRemotingConnection backupConnection)
// Now start the session if it was already started
if (started)
{
- for (ClientConsumerInternal consumer : consumers.values())
+ for (ClientConsumerInternal consumer : cloneConsumers())
{
consumer.clearAtFailover();
consumer.start();
@@ -1527,7 +1535,7 @@ public void rollback(final Xid xid) throws XAException
}
// We need to make sure we don't get any inflight messages
- for (ClientConsumerInternal consumer : consumers.values())
+ for (ClientConsumerInternal consumer : cloneConsumers())
{
consumer.clear(false);
}
@@ -1871,6 +1879,19 @@ public ClassLoader run()
});
}
+
+ /**
+ * @param consumerID
+ * @return
+ */
+ private ClientConsumerInternal getConsumer(final long consumerID)
+ {
+ synchronized (consumers)
+ {
+ ClientConsumerInternal consumer = consumers.get(consumerID);
+ return consumer;
+ }
+ }
private void doCleanup(boolean failingOver)
{
@@ -1900,14 +1921,14 @@ private void doCleanup(boolean failingOver)
private void cleanUpChildren() throws Exception
{
- Set<ClientConsumerInternal> consumersClone = new HashSet<ClientConsumerInternal>(consumers.values());
+ Set<ClientConsumerInternal> consumersClone = cloneConsumers();
for (ClientConsumerInternal consumer : consumersClone)
{
consumer.cleanUp();
}
- Set<ClientProducerInternal> producersClone = new HashSet<ClientProducerInternal>(producers);
+ Set<ClientProducerInternal> producersClone = cloneProducers();
for (ClientProducerInternal producer : producersClone)
{
@@ -1915,16 +1936,41 @@ private void cleanUpChildren() throws Exception
}
}
+ /**
+ * @return
+ */
+ private Set<ClientProducerInternal> cloneProducers()
+ {
+ Set<ClientProducerInternal> producersClone;
+
+ synchronized (producers)
+ {
+ producersClone = new HashSet<ClientProducerInternal>(producers);
+ }
+ return producersClone;
+ }
+
+ /**
+ * @return
+ */
+ private Set<ClientConsumerInternal> cloneConsumers()
+ {
+ synchronized (consumers)
+ {
+ return new HashSet<ClientConsumerInternal>(consumers.values());
+ }
+ }
+
private void closeChildren() throws HornetQException
{
- Set<ClientConsumer> consumersClone = new HashSet<ClientConsumer>(consumers.values());
+ Set<ClientConsumerInternal> consumersClone = cloneConsumers();
for (ClientConsumer consumer : consumersClone)
{
consumer.close();
}
- Set<ClientProducer> producersClone = new HashSet<ClientProducer>(producers);
+ Set<ClientProducerInternal> producersClone = cloneProducers();
for (ClientProducer producer : producersClone)
{
@@ -1934,12 +1980,9 @@ private void closeChildren() throws HornetQException
private void flushAcks() throws HornetQException
{
- synchronized (consumers)
+ for (ClientConsumerInternal consumer : cloneConsumers())
{
- for (ClientConsumerInternal consumer : consumers.values())
- {
- consumer.flushAcks();
- }
+ consumer.flushAcks();
}
}
View
3  src/main/org/hornetq/core/client/impl/ClientSessionInternal.java
@@ -59,6 +59,8 @@
void handleReceiveContinuation(long consumerID, SessionReceiveContinuationMessage continuation) throws Exception;
+ void preHandleFailover(CoreRemotingConnection connection);
+
void handleFailover(CoreRemotingConnection backupConnection);
RemotingConnection getConnection();
@@ -92,5 +94,4 @@
void setPacketSize(int packetSize);
void resetIfNeeded() throws HornetQException;
-
}
View
5 src/main/org/hornetq/core/client/impl/DelegatingSession.java
@@ -373,6 +373,11 @@ public XAResource getXAResource()
return session.getXAResource();
}
+ public void preHandleFailover(CoreRemotingConnection connection)
+ {
+ session.preHandleFailover(connection);
+ }
+
public void handleFailover(final CoreRemotingConnection backupConnection)
{
session.handleFailover(backupConnection);
View
49 src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
@@ -58,6 +58,7 @@
public class ServerLocatorImpl implements ServerLocatorInternal, DiscoveryListener, Serializable
{
/*needed for backward compatibility*/
+ @SuppressWarnings("unused")
private final Set<ClusterTopologyListener> topologyListeners = new HashSet<ClusterTopologyListener>();
/*end of compatibility fixes*/
@@ -82,7 +83,7 @@
private final Set<ClientSessionFactoryInternal> connectingFactories = new HashSet<ClientSessionFactoryInternal>();
- private TransportConfiguration[] initialConnectors;
+ private volatile TransportConfiguration[] initialConnectors;
private DiscoveryGroupConfiguration discoveryGroupConfiguration;
@@ -583,12 +584,15 @@ public void disableFinalizeCheck()
public ClientSessionFactoryInternal connect() throws Exception
{
- // static list of initial connectors
- if (initialConnectors != null && discoveryGroup == null)
+ synchronized (this)
{
- ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal)staticConnector.connect();
- addFactory(sf);
- return sf;
+ // static list of initial connectors
+ if (initialConnectors != null && discoveryGroup == null)
+ {
+ ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal)staticConnector.connect();
+ addFactory(sf);
+ return sf;
+ }
}
// wait for discovery group to get the list of initial connectors
return (ClientSessionFactoryInternal)createSessionFactory();
@@ -1454,6 +1458,7 @@ public String toString()
"]";
}
+ @SuppressWarnings("unchecked")
private synchronized void updateArraysAndPairs()
{
Collection<TopologyMember> membersCopy = topology.getMembers();
@@ -1472,13 +1477,14 @@ public synchronized void connectorsChanged()
{
List<DiscoveryEntry> newConnectors = discoveryGroup.getDiscoveryEntries();
- this.initialConnectors = (TransportConfiguration[])Array.newInstance(TransportConfiguration.class,
+
+ TransportConfiguration[] newInitialconnectors = (TransportConfiguration[])Array.newInstance(TransportConfiguration.class,
newConnectors.size());
int count = 0;
for (DiscoveryEntry entry : newConnectors)
{
- this.initialConnectors[count++] = entry.getConnector();
+ newInitialconnectors[count++] = entry.getConnector();
if (ha && topology.getMember(entry.getNodeID()) == null)
{
@@ -1487,18 +1493,35 @@ public synchronized void connectorsChanged()
topology.updateMember(0, entry.getNodeID(), member);
}
}
+
+ this.initialConnectors = newInitialconnectors;
if (clusterConnection && !receivedTopology && initialConnectors.length > 0)
{
- // FIXME the node is alone in the cluster. We create a connection to the new node
+ // The node is alone in the cluster. We create a connection to the new node
// to trigger the node notification to form the cluster.
- try
+
+ Runnable connectRunnable = new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ connect();
+ }
+ catch (Exception e)
+ {
+ log.warn(e.getMessage(), e);
+ }
+ }
+ };
+ if (startExecutor != null)
{
- connect();
+ startExecutor.execute(connectRunnable);
}
- catch (Exception e)
+ else
{
- e.printStackTrace(); // To change body of catch statement use File | Settings | File Templates.
+ connectRunnable.run();
}
}
}
View
5 src/main/org/hornetq/core/client/impl/Topology.java
@@ -225,6 +225,11 @@ public boolean updateMember(final long uniqueEventID, final String nodeId, final
}
else
{
+ /*always add the backup, better to try to reconnect to something thats not there then to not know about it at all*/
+ if(currentMember.getB() == null && memberInput.getB() != null)
+ {
+ currentMember.setB(memberInput.getB());
+ }
return false;
}
}
View
26 src/main/org/hornetq/core/config/BridgeConfiguration.java
@@ -68,6 +68,10 @@
private final long maxRetryInterval;
private final int minLargeMessageSize;
+
+ // At this point this is only changed on testcases
+ // The bridge shouldn't be sending blocking anyways
+ private long callTimeout = HornetQClient.DEFAULT_CALL_TIMEOUT;
/**
* For backward compatibility on the API... no MinLargeMessage on this constructor
@@ -445,4 +449,26 @@ public void setPassword(String password)
{
this.password = password;
}
+
+
+ /**
+ * @return the callTimeout
+ */
+ public long getCallTimeout()
+ {
+ return callTimeout;
+ }
+
+ /**
+ *
+ * At this point this is only changed on testcases
+ * The bridge shouldn't be sending blocking anyways
+ * @param callTimeout the callTimeout to set
+ */
+ public void setCallTimeout(long callTimeout)
+ {
+ this.callTimeout = callTimeout;
+ }
+
+
}
View
12 src/main/org/hornetq/core/management/impl/ClusterConnectionControlImpl.java
@@ -210,6 +210,18 @@ public boolean isForwardWhenNoConsumers()
}
}
+ public String getTopology()
+ {
+ clearIO();
+ try
+ {
+ return clusterConnection.getTopology().describe();
+ }
+ finally
+ {
+ blockOnIO();
+ }
+ }
public Map<String, String> getNodes() throws Exception
{
clearIO();
View
10 src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
@@ -572,11 +572,19 @@ public HandleStatus handle(final MessageReference ref) throws Exception
{
producer.send(dest, message);
}
- catch (HornetQException e)
+ catch (final HornetQException e)
{
log.warn("Unable to send message " + ref + ", will try again once bridge reconnects", e);
refs.remove(ref);
+
+ executor.execute(new Runnable()
+ {
+ public void run()
+ {
+ connectionFailed(e, false);
+ }
+ });
return HandleStatus.BUSY;
}
View
9 src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
@@ -27,6 +27,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import org.hornetq.api.core.DiscoveryGroupConfiguration;
import org.hornetq.api.core.Pair;
@@ -497,6 +498,14 @@ public void run()
catch (Exception e)
{
log.warn("Unable to announce backup, retrying", e);
+
+ scheduledExecutor.schedule(new Runnable(){
+ public void run()
+ {
+ announceBackup();
+ }
+
+ }, retryInterval, TimeUnit.MILLISECONDS);
}
}
});
View
6 src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
@@ -471,6 +471,12 @@ public synchronized void deployBridge(final BridgeConfiguration config, final bo
serverLocator.setBlockOnDurableSend(!config.isUseDuplicateDetection());
serverLocator.setBlockOnNonDurableSend(!config.isUseDuplicateDetection());
serverLocator.setMinLargeMessageSize(config.getMinLargeMessageSize());
+
+ // This will be set to 30s unless it's changed from embedded / testing
+ // there is no reason to exception the config for this timeout
+ // since the Bridge is supposed to be non-blocking and fast
+ // We may expose this if we find a good use case
+ serverLocator.setCallTimeout(config.getCallTimeout());
if (!config.isUseDuplicateDetection())
{
log.debug("Bridge " + config.getName() +
View
30 src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
@@ -317,6 +317,12 @@ protected NodeManager createNodeManager(final String directory)
public synchronized void start() throws Exception
{
+ if (started)
+ {
+ log.debug("Server already started!");
+ return;
+ }
+
log.debug("Starting server " + this);
OperationContextImpl.clearContext();
@@ -486,20 +492,32 @@ protected void stop(boolean failoverOnServerShutdown, boolean criticalIOError) t
}
+ remotingService.stop();
+
// We close all the exception in an attempt to let any pending IO to finish
// to avoid scenarios where the send or ACK got to disk but the response didn't get to the client
// It may still be possible to have this scenario on a real failure (without the use of XA)
// But at least we will do our best to avoid it on regular shutdowns
for (ServerSession session : sessions.values())
{
- session.close(true);
- if (!criticalIOError)
+ try
{
- session.waitContextCompletion();
+ storageManager.setContext(session.getSessionContext());
+ session.close(true);
+ if (!criticalIOError)
+ {
+ session.waitContextCompletion();
+ }
+ }
+ catch (Exception e)
+ {
+ // If anything went wrong with closing sessions.. we should ignore it
+ // such as transactions.. etc.
+ log.warn(e.getMessage(), e);
}
}
-
- remotingService.stop();
+
+ storageManager.clearContext();
synchronized (this)
{
@@ -1606,7 +1624,7 @@ private void deployAddressSettingsFromConfiguration()
for (Pair<Long, Long> msgToDelete : pendingLargeMessages)
{
- log.info("Deleting pending large message as it wasn't completed:" + msgToDelete);
+ log.info("Deleting pending large message as it wasn't completed: LargeMessageID:" + msgToDelete.getB());
LargeServerMessage msg = storageManager.createLargeMessage();
msg.setMessageID(msgToDelete.getB());
msg.setPendingRecordID(msgToDelete.getA());
View
4 src/main/org/hornetq/core/server/impl/QueueImpl.java
@@ -2328,6 +2328,10 @@ public void afterRollback(final Transaction tx)
for (MessageReference ref : refsToAck)
{
+ if (log.isTraceEnabled())
+ {
+ log.trace("rolling back " + ref);
+ }
try
{
if (ref.getQueue().checkRedelivery(ref, timeBase))
View
85 src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
@@ -21,6 +21,7 @@
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQBuffers;
+import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.management.ManagementHelper;
import org.hornetq.api.core.management.NotificationType;
import org.hornetq.core.client.impl.ClientConsumerImpl;
@@ -586,7 +587,7 @@ public Queue getQueue()
return messageQueue;
}
- public void acknowledge(final boolean autoCommitAcks, final Transaction tx, final long messageID) throws Exception
+ public void acknowledge(final boolean autoCommitAcks, Transaction tx, final long messageID) throws Exception
{
if (browseOnly)
{
@@ -595,34 +596,78 @@ public void acknowledge(final boolean autoCommitAcks, final Transaction tx, fina
// Acknowledge acknowledges all refs delivered by the consumer up to and including the one explicitly
// acknowledged
-
- MessageReference ref;
- do
+
+ // We use a transaction here as if the message is not found, we should rollback anything done
+ // This could eventually happen on retries during transactions, and we need to make sure we don't ACK things we are not supposed to acknowledge
+
+ boolean startedTransaction = false;
+
+ if (tx == null || autoCommitAcks)
{
- ref = deliveringRefs.poll();
-
- if (ref == null)
+ startedTransaction = true;
+ tx = new TransactionImpl(storageManager);
+ }
+
+ try
+ {
+
+ MessageReference ref;
+ do
{
- throw new IllegalStateException(System.identityHashCode(this) + " Could not find reference on consumerID=" +
- id +
- ", messageId = " +
- messageID +
- " queue = " +
- messageQueue.getName() +
- " closed = " +
- closed);
+ ref = deliveringRefs.poll();
+
+ if (log.isTraceEnabled())
+ {
+ log.trace("ACKing ref " + ref + " on " + this);
+ }
+
+ if (ref == null)
+ {
+
+ HornetQException e = new HornetQException(HornetQException.ILLEGAL_STATE, "Could not find reference on consumerID=" +
+ id +
+ ", messageId = " +
+ messageID +
+ " queue = " +
+ messageQueue.getName());
+ throw e;
+ }
+
+ ref.getQueue().acknowledge(tx, ref);
}
-
- if (autoCommitAcks || tx == null)
+ while (ref.getMessage().getMessageID() != messageID);
+
+ if (startedTransaction)
{
- ref.getQueue().acknowledge(ref);
+ tx.commit();
+ }
+ }
+ catch (HornetQException e)
+ {
+ if (startedTransaction)
+ {
+ tx.rollback();
}
else
{
- ref.getQueue().acknowledge(tx, ref);
+ tx.markAsRollbackOnly(e);
+ }
+ throw e;
+ }
+ catch (Throwable e)
+ {
+ log.error(e.getMessage(), e);
+ HornetQException hqex = new HornetQException(HornetQException.ILLEGAL_STATE, e.getMessage());
+ if (startedTransaction)
+ {
+ tx.rollback();
+ }
+ else
+ {
+ tx.markAsRollbackOnly(hqex);
}
+ throw hqex;
}
- while (ref.getMessage().getMessageID() != messageID);
}
public void individualAcknowledge(final boolean autoCommitAcks, final Transaction tx, final long messageID) throws Exception
View
38 src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
@@ -289,7 +289,14 @@ private synchronized void doClose(final boolean failed) throws Exception
{
// We only rollback local txs on close, not XA tx branches
- rollback(failed);
+ try
+ {
+ rollback(false);
+ }
+ catch (Exception e)
+ {
+ log.warn(e.getMessage(), e);
+ }
}
Set<ServerConsumer> consumersClone = new HashSet<ServerConsumer>(consumers.values());
@@ -579,6 +586,11 @@ public void forceConsumerDelivery(final long consumerID, final long sequence) th
public void acknowledge(final long consumerID, final long messageID) throws Exception
{
ServerConsumer consumer = consumers.get(consumerID);
+
+ if (consumer == null)
+ {
+ throw new HornetQException(HornetQException.ILLEGAL_STATE, "Consumer " + consumerID + " wasn't created on the server");
+ }
consumer.acknowledge(autoCommitAcks, tx, messageID);
}
@@ -605,7 +617,7 @@ public void expire(final long consumerID, final long messageID) throws Exception
}
}
- public void commit() throws Exception
+ public synchronized void commit() throws Exception
{
if (isTrace)
{
@@ -621,7 +633,7 @@ public void commit() throws Exception
}
}
- public void rollback(final boolean considerLastMessageAsDelivered) throws Exception
+ public synchronized void rollback(final boolean considerLastMessageAsDelivered) throws Exception
{
if (tx == null)
{
@@ -642,7 +654,7 @@ public void rollback(final boolean considerLastMessageAsDelivered) throws Except
}
}
- public void xaCommit(final Xid xid, final boolean onePhase) throws Exception
+ public synchronized void xaCommit(final Xid xid, final boolean onePhase) throws Exception
{
if (tx != null && tx.getXid().equals(xid))
{
@@ -690,7 +702,7 @@ else if (resourceManager.getHeuristicRolledbackTransactions().contains(xid))
}
}
- public void xaEnd(final Xid xid) throws Exception
+ public synchronized void xaEnd(final Xid xid) throws Exception
{
if (tx != null && tx.getXid().equals(xid))
{
@@ -735,7 +747,7 @@ public void xaEnd(final Xid xid) throws Exception
}
}
- public void xaForget(final Xid xid) throws Exception
+ public synchronized void xaForget(final Xid xid) throws Exception
{
long id = resourceManager.removeHeuristicCompletion(xid);
@@ -758,7 +770,7 @@ public void xaForget(final Xid xid) throws Exception
}
}
- public void xaJoin(final Xid xid) throws Exception
+ public synchronized void xaJoin(final Xid xid) throws Exception
{
Transaction theTx = resourceManager.getTransaction(xid);
@@ -781,7 +793,7 @@ public void xaJoin(final Xid xid) throws Exception
}
}
- public void xaResume(final Xid xid) throws Exception
+ public synchronized void xaResume(final Xid xid) throws Exception
{
if (tx != null)
{
@@ -816,7 +828,7 @@ public void xaResume(final Xid xid) throws Exception
}
}
- public void xaRollback(final Xid xid) throws Exception
+ public synchronized void xaRollback(final Xid xid) throws Exception
{
if (tx != null && tx.getXid().equals(xid))
{
@@ -865,7 +877,7 @@ else if (resourceManager.getHeuristicRolledbackTransactions().contains(xid))
}
}
- public void xaStart(final Xid xid) throws Exception
+ public synchronized void xaStart(final Xid xid) throws Exception
{
if (tx != null)
{
@@ -888,7 +900,7 @@ public void xaStart(final Xid xid) throws Exception
}
}
- public void xaSuspend() throws Exception
+ public synchronized void xaSuspend() throws Exception
{
if (tx == null)
{
@@ -913,7 +925,7 @@ public void xaSuspend() throws Exception
}
}
- public void xaPrepare(final Xid xid) throws Exception
+ public synchronized void xaPrepare(final Xid xid) throws Exception
{
if (tx != null && tx.getXid().equals(xid))
{
@@ -1062,7 +1074,7 @@ public void receiveConsumerCredits(final long consumerID, final int credits) thr
if (consumer == null)
{
- ServerSessionImpl.log.error("There is no consumer with id " + consumerID);
+ ServerSessionImpl.log.debug("There is no consumer with id " + consumerID);
return;
}
View
10 src/main/org/hornetq/integration/spring/SpringBindingRegistry.java
@@ -1,6 +1,7 @@
package org.hornetq.integration.spring;
import org.hornetq.spi.core.naming.BindingRegistry;
+import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
/**
@@ -18,7 +19,14 @@ public SpringBindingRegistry(ConfigurableBeanFactory factory)
public Object lookup(String name)
{
- return factory.getBean(name);
+ try
+ {
+ return factory.getBean(name);
+ }
+ catch (NoSuchBeanDefinitionException e)
+ {
+ return null;
+ }
}
public boolean bind(String name, Object obj)
View
5 src/main/org/hornetq/jms/management/impl/JMSQueueControlImpl.java
@@ -173,6 +173,11 @@ public void addJNDI(String jndi) throws Exception
{
jmsServerManager.addQueueToJndi(managedQueue.getName(), jndi);
}
+
+ public void removeJNDI(String jndi) throws Exception
+ {
+ jmsServerManager.removeQueueFromJNDI(managedQueue.getName(), jndi);
+ }
public String[] getJNDIBindings()
{
View
10 src/main/org/hornetq/jms/management/impl/JMSTopicControlImpl.java
@@ -91,6 +91,16 @@ public void addJNDI(String jndi) throws Exception
{
jmsServerManager.addTopicToJndi(managedTopic.getName(), jndi);
}
+
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.jms.management.TopicControl#removeJNDI(java.lang.String)
+ */
+ public void removeJNDI(String jndi) throws Exception
+ {
+ jmsServerManager.removeTopicFromJNDI(managedTopic.getName(), jndi);
+ }
+
public String[] getJNDIBindings()
{
View
25 src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
@@ -408,6 +408,8 @@ public String toString()
public void runException() throws Exception
{
+ checkJNDI(jndi);
+
if (internalCreateQueue(queueName, selectorString, durable))
{
@@ -462,6 +464,8 @@ public String toString()
public void runException() throws Exception
{
+ checkJNDI(jndi);
+
if (internalCreateTopic(topicName))
{
HornetQDestination destination = topics.get(topicName);
@@ -501,6 +505,8 @@ public void runException() throws Exception
public boolean addTopicToJndi(final String topicName, final String jndiBinding) throws Exception
{
checkInitialised();
+
+ checkJNDI(jndiBinding);
HornetQTopic destination = topics.get(topicName);
if (destination == null)
@@ -540,6 +546,8 @@ public boolean addQueueToJndi(final String queueName, final String jndiBinding)
{
checkInitialised();
+ checkJNDI(jndiBinding);
+
HornetQQueue destination = queues.get(queueName);
if (destination == null)
{
@@ -561,6 +569,8 @@ public boolean addQueueToJndi(final String queueName, final String jndiBinding)
public boolean addConnectionFactoryToJNDI(final String name, final String jndiBinding) throws Exception
{
checkInitialised();
+
+ checkJNDI(jndiBinding);
HornetQConnectionFactory factory = connectionFactories.get(name);
if (factory == null)
@@ -954,7 +964,7 @@ public synchronized void createConnectionFactory(final boolean storeConfig,
{
runAfterActive(new RunnableException()
{
-
+
public String toString()
{
return "createConnectionFactory for " + cfConfig.getName();
@@ -962,6 +972,7 @@ public String toString()
public void runException() throws Exception
{
+ checkJNDI(jndi);
HornetQConnectionFactory cf = internalCreateCF(storeConfig, cfConfig);
@@ -1402,6 +1413,18 @@ private void addToBindings(Map<String, List<String>> map, String name, String...
list.add(jndiItem);
}
}
+
+ private void checkJNDI(final String ... jndiNames) throws NamingException
+ {
+
+ for (String jndiName : jndiNames)
+ {
+ if (registry.lookup(jndiName) != null)
+ {
+ throw new NamingException(jndiName + " already has an object bound");
+ }
+ }
+ }
private boolean bindToJndi(final String jndiName, final Object objectToBind) throws NamingException
{
View
4 src/main/org/hornetq/ra/HornetQRAConnectionMetaData.java
@@ -113,7 +113,7 @@ public String getProviderVersion()
HornetQRAConnectionMetaData.log.trace("getJMSProviderName()");
}
- return "2.1";
+ return "2.2";
}
/**
@@ -141,7 +141,7 @@ public int getProviderMinorVersion()
HornetQRAConnectionMetaData.log.trace("getProviderMinorVersion()");
}
- return 1;
+ return 2;
}
/**
View
7 src/main/org/hornetq/ra/inflow/HornetQActivation.java
@@ -368,6 +368,13 @@ protected ClientSession setupSession() throws Exception
spec.isUseLocalTx(),
spec.getTransactionTimeout());
+ result.addMetaData("resource-adapter", "inbound");
+ result.addMetaData("jms-session", "");
+ if (spec.getClientID() != null)
+ {
+ result.addMetaData("jms-client-id", spec.getClientID());
+ }
+
HornetQActivation.log.debug("Using queue connection " + result);
return result;
View
43 tests/src/org/hornetq/tests/integration/InterceptorTest.java
@@ -19,6 +19,7 @@
import org.hornetq.api.core.Message;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.client.*;
+import org.hornetq.core.client.impl.ClientConsumerImpl;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.protocol.core.Packet;
import org.hornetq.core.protocol.core.impl.PacketImpl;
@@ -127,15 +128,37 @@ public boolean intercept(final Packet packet, final RemotingConnection connectio
{
public boolean intercept(final Packet packet, final RemotingConnection connection) throws HornetQException
{
+ if (isForceDeliveryResponse(packet))
+ {
+ return true;
+ }
+
if (packet.getType() == PacketImpl.SESS_RECEIVE_MSG)
{
return false;
}
-
+
return true;
}
}
+ /**
+ * @param packet
+ */
+ private boolean isForceDeliveryResponse(final Packet packet)
+ {
+ if (packet.getType() == PacketImpl.SESS_RECEIVE_MSG)
+ {
+ SessionReceiveMessage msg = (SessionReceiveMessage) packet;
+ if (msg.getMessage().containsProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE))
+ {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
private class MyInterceptor5 implements Interceptor
{
@@ -224,6 +247,12 @@ public void setWasCalled(final boolean wasCalled)
public boolean intercept(final Packet packet, final RemotingConnection connection) throws HornetQException
{
+
+ if (isForceDeliveryResponse(packet))
+ {
+ return true;
+ }
+
if (packet.getType() == PacketImpl.SESS_RECEIVE_MSG)
{
SessionReceiveMessage p = (SessionReceiveMessage)packet;
@@ -262,6 +291,8 @@ public void testServerInterceptorChangeProperty() throws Exception
for (int i = 0; i < numMessages; i++)
{
ClientMessage message = session.createMessage(false);
+
+ message.putIntProperty("count", i);
message.putStringProperty(InterceptorTest.key, "apple");
@@ -275,7 +306,11 @@ public void testServerInterceptorChangeProperty() throws Exception
for (int i = 0; i < numMessages; i++)
{
ClientMessage message = consumer.receive(1000);
-
+
+ assertNotNull(message);
+
+ assertEquals(i, message.getIntProperty("count").intValue());
+
Assert.assertEquals("orange", message.getStringProperty(InterceptorTest.key));
}
@@ -413,7 +448,7 @@ public void testClientInterceptorRejectPacket() throws Exception
for (int i = 0; i < numMessages; i++)
{
ClientMessage message = session.createMessage(false);
-
+
producer.send(message);
}
@@ -422,7 +457,7 @@ public void testClientInterceptorRejectPacket() throws Exception
session.start();
ClientMessage message = consumer.receive(100);
-
+
Assert.assertNull(message);
session.close();
View
111 tests/src/org/hornetq/tests/integration/client/AcknowledgeTest.java
@@ -19,7 +19,14 @@
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.SimpleString;
-import org.hornetq.api.core.client.*;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.MessageHandler;
+import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.core.client.impl.ClientSessionInternal;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.Queue;
@@ -82,6 +89,108 @@ public void testReceiveAckLastMessageOnly() throws Exception
}
}
}
+
+
+ /**
+ * This is validating a case where a consumer will try to ack a message right after failover, but the consumer at the target server didn't
+ * receive the message yet.
+ * on that case the system should rollback any acks done and redeliver any messages
+ */
+ public void testInvalidACK() throws Exception
+ {
+ HornetQServer server = createServer(false);
+ try
+ {
+ server.start();
+
+ ServerLocator locator = createInVMNonHALocator();
+
+ locator.setAckBatchSize(0);
+
+ locator.setBlockOnAcknowledge(true);
+
+ ClientSessionFactory cf = locator.createSessionFactory();
+
+
+ int numMessages = 100;
+
+ ClientSession sessionConsumer = cf.createSession(true, true, 0);
+
+ sessionConsumer.start();
+
+ sessionConsumer.createQueue(addressA, queueA, true);
+
+ ClientConsumer consumer = sessionConsumer.createConsumer(queueA);
+
+ // sending message
+ {
+ ClientSession sendSession = cf.createSession(false, true, true);
+
+ ClientProducer cp = sendSession.createProducer(addressA);
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage msg = sendSession.createMessage(true);
+ msg.putIntProperty("seq", i);
+ cp.send(msg);
+ }
+
+ sendSession.close();
+ }
+
+ {
+
+ ClientMessage msg = consumer.receive(5000);
+
+ // need to way some time before all the possible references are sent to the consumer
+ // as we need to guarantee the order on cancellation on this test
+ Thread.sleep(1000);
+
+ try
+ {
+ // pretending to be an unbehaved client doing an invalid ack right after failover
+ ((ClientSessionInternal)sessionConsumer).acknowledge(0, 12343);
+ fail("supposed to throw an exception here");
+ }
+ catch (Exception e)
+ {
+ }
+
+ try
+ {
+ // pretending to be an unbehaved client doing an invalid ack right after failover
+ ((ClientSessionInternal)sessionConsumer).acknowledge(3, 12343);
+ fail("supposed to throw an exception here");
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+
+ consumer.close();
+
+ consumer = sessionConsumer.createConsumer(queueA);
+
+
+ for (int i = 0 ; i < numMessages; i++)
+ {
+ msg = consumer.receive(5000);
+ assertNotNull(msg);
+ assertEquals(i, msg.getIntProperty("seq").intValue());
+ msg.acknowledge();
+ }
+ }
+ }
+ finally
+ {
+ if (server.isStarted())
+ {
+ server.stop();
+ }
+ }
+ }
+
+
public void testAsyncConsumerNoAck() throws Exception
{
View
3  tests/src/org/hornetq/tests/integration/client/InterruptedLargeMessageTest.java
@@ -128,6 +128,9 @@ public void testInterruptLargeMessageSend() throws Exception
}
server.stop(false);
+
+ forceGC();
+
server.start();
server.stop();
View
7 tests/src/org/hornetq/tests/integration/client/JMSPagingFileDeleteTest.java
@@ -149,6 +149,13 @@ public void testTopics() throws Exception
producer.send(bytesMessage);
printPageStoreInfo(pagingStore);
+
+ timeout = System.currentTimeMillis() + 10000;
+
+ while (timeout > System.currentTimeMillis() && pagingStore.getNumberOfPages() != 1)
+ {
+ Thread.sleep(100);
+ }
assertEquals(1, pagingStore.getNumberOfPages()); //I expected number of the page is 1, but It was not.
}
View
2  tests/src/org/hornetq/tests/integration/client/PagingTest.java
@@ -4536,7 +4536,7 @@ public void testExpireLargeMessageOnPaging() throws Exception
for (int i = 0; i < 500; i++)
{
log.info("Received message " + i);
- message = cons.receive(5000);
+ message = cons.receive(10000);
assertNotNull(message);
message.acknowledge();
View
44 tests/src/org/hornetq/tests/integration/client/RedeliveryConsumerTest.java
@@ -233,16 +233,15 @@ protected void testDedeliveryMessageOnPersistent(final boolean strictUpdate) thr
consumer = session.createConsumer(ADDRESS);
msg = consumer.receive(1000);
Assert.assertNotNull(msg);
- Assert.assertEquals(2, msg.getDeliveryCount());
+ Assert.assertEquals(strictUpdate ? 1 : 2, msg.getDeliveryCount());
session.close();
}
-
public void testInfiniteDedeliveryMessageOnPersistent() throws Exception
{
internaltestInfiniteDedeliveryMessageOnPersistent(false);
}
-
+
private void internaltestInfiniteDedeliveryMessageOnPersistent(final boolean strict) throws Exception
{
setUp(strict);
@@ -255,9 +254,8 @@ private void internaltestInfiniteDedeliveryMessageOnPersistent(final boolean str
session.commit();
session.close();
-
int expectedCount = 1;
- for (int i = 0 ; i < 700; i++)
+ for (int i = 0; i < 700; i++)
{
session = factory.createSession(false, false, false);
session.start();
@@ -277,10 +275,10 @@ private void internaltestInfiniteDedeliveryMessageOnPersistent(final boolean str
factory.close();
server.stop();
-
+
setUp(false);
-
- for (int i = 0 ; i < 700; i++)
+
+ for (int i = 0; i < 700; i++)
{
session = factory.createSession(false, false, false);
session.start();
@@ -293,27 +291,26 @@ private void internaltestInfiniteDedeliveryMessageOnPersistent(final boolean str
server.stop();
-
- JournalImpl journal = new JournalImpl(server.getConfiguration().getJournalFileSize(),
- 2,
+ JournalImpl journal = new JournalImpl(server.getConfiguration().getJournalFileSize(),
+ 2,
0,
- 0,
- new NIOSequentialFileFactory(server.getConfiguration().getJournalDirectory()),
+ 0,
+ new NIOSequentialFileFactory(server.getConfiguration()
+ .getJournalDirectory()),
"hornetq-data",
"hq",
1);
-
-
+
final AtomicInteger updates = new AtomicInteger();
-
+
journal.start();
journal.load(new LoaderCallback()
{
-
+
public void failedTransaction(long transactionID, List<RecordInfo> records, List<RecordInfo> recordsToDelete)
{
}
-
+
public void updateRecord(RecordInfo info)
{
if (info.userRecordType == JournalStorageManager.UPDATE_DELIVERY_COUNT)
@@ -321,23 +318,22 @@ public void updateRecord(RecordInfo info)
updates.incrementAndGet();
}
}
-
+
public void deleteRecord(long id)
{
}
-
+
public void addRecord(RecordInfo info)
{
}
-
+
public void addPreparedTransaction(PreparedTransactionInfo preparedTransaction)
{
}
});
-
+
journal.stop();
-
-
+
assertEquals(7, updates.get());
}
View
2  tests/src/org/hornetq/tests/integration/clientcrash/ClientCrashTest.java
@@ -143,7 +143,7 @@ public void testCrashClient2() throws Exception
Assert.assertNotNull("no message received", messageFromClient);
Assert.assertEquals(ClientCrashTest.MESSAGE_TEXT_FROM_CLIENT, messageFromClient.getBodyBuffer().readString());
- assertEquals(2, messageFromClient.getDeliveryCount());
+ assertEquals(1, messageFromClient.getDeliveryCount());
session.close();
View
221 tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
@@ -17,6 +17,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -24,6 +25,8 @@
import junit.framework.Assert;
import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.Interceptor;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientConsumer;
@@ -39,6 +42,9 @@
import org.hornetq.core.logging.Logger;
import org.hornetq.core.postoffice.DuplicateIDCache;
import org.hornetq.core.postoffice.impl.PostOfficeImpl;
+import org.hornetq.core.protocol.core.Packet;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionSendMessage;
import org.hornetq.core.remoting.impl.invm.TransportConstants;
import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
import org.hornetq.core.server.HornetQServer;
@@ -46,6 +52,7 @@
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.cluster.impl.BridgeImpl;
import org.hornetq.core.transaction.impl.TransactionImpl;
+import org.hornetq.spi.core.protocol.RemotingConnection;
import org.hornetq.tests.util.RandomUtil;
import org.hornetq.tests.util.ServiceTestBase;
import org.hornetq.tests.util.UnitTestCase;
@@ -263,6 +270,220 @@ public void internaltestSimpleBridge(final boolean largeMessage, final boolean u
}
+
+ public void testLostMessageSimpleMessage() throws Exception
+ {
+ internalTestMessageLoss(false);
+ }
+
+ public void testLostMessageLargeMessage() throws Exception
+ {
+ internalTestMessageLoss(true);
+ }
+
+ /** This test will ignore messages
+ What will cause the bridge to fail with a timeout
+ The bridge should still recover the failure and reconnect on that case */
+ public void internalTestMessageLoss(final boolean largeMessage) throws Exception
+ {
+ class MyInterceptor implements Interceptor
+ {
+ public boolean ignoreSends = true;
+ public CountDownLatch latch;
+
+ MyInterceptor(int numberOfIgnores)
+ {
+ latch = new CountDownLatch(numberOfIgnores);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.Interceptor#intercept(org.hornetq.core.protocol.core.Packet, org.hornetq.spi.core.protocol.RemotingConnection)
+ */
+ public boolean intercept(Packet packet, RemotingConnection connection) throws HornetQException
+ {
+ if (ignoreSends && packet instanceof SessionSendMessage ||
+ ignoreSends && packet instanceof SessionSendContinuationMessage && !((SessionSendContinuationMessage)packet).isContinues())
+ {
+ System.out.println("Ignored");
+ latch.countDown();
+ return false;
+ }
+ else
+ {
+ return true;
+ }
+ }
+
+ }
+
+ MyInterceptor myInterceptor = new MyInterceptor(3);
+
+ HornetQServer server0 = null;
+ HornetQServer server1 = null;
+ ServerLocator locator = null;
+ try
+ {
+ Map<String, Object> server0Params = new HashMap<String, Object>();
+ server0 = createClusteredServerWithParams(isNetty(), 0, true, server0Params);
+
+ Map<String, Object> server1Params = new HashMap<String, Object>();
+ addTargetParameters(server1Params);
+ server1 = createClusteredServerWithParams(isNetty(), 1, true, server1Params);
+
+ final String testAddress = "testAddress";
+ final String queueName0 = "queue0";
+ final String forwardAddress = "forwardAddress";
+ final String queueName1 = "queue1";
+
+ TransportConfiguration server0tc = new TransportConfiguration(getConnector(), server0Params);
+
+ TransportConfiguration server1tc = new TransportConfiguration(getConnector(), server1Params);
+
+ HashMap<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
+ connectors.put(server1tc.getName(), server1tc);
+ server0.getConfiguration().setConnectorConfigurations(connectors);
+
+ final int messageSize = 1024;
+
+ final int numMessages = 1;
+
+ ArrayList<String> connectorConfig = new ArrayList<String>();
+ connectorConfig.add(server1tc.getName());
+ BridgeConfiguration bridgeConfiguration = new BridgeConfiguration("bridge1",
+ queueName0,
+ forwardAddress,
+ null,
+ null,
+ HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+ HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+ HornetQClient.DEFAULT_CONNECTION_TTL,
+ 1000,
+ HornetQClient.DEFAULT_MAX_RETRY_INTERVAL,
+ 1d,
+ -1,
+ false,
+ // Choose confirmation size to make sure acks
+ // are sent
+ numMessages * messageSize / 2,
+ connec