Skip to content
Browse files

Merge pull request #1554 from clebertsuconic/master-multiprotocol

  • Loading branch information...
2 parents e435613 + 9cbf4e3 commit dca7e846f00f2540c6cb002711abcb84425f6e6f @andytaylor andytaylor committed
Showing with 3,563 additions and 1,908 deletions.
  1. +8 −0 hornetq-core-client/src/main/java/org/hornetq/api/core/client/ClientConsumer.java
  2. +32 −4 hornetq-core-client/src/main/java/org/hornetq/api/core/client/ClientSession.java
  3. +2 −2 hornetq-core-client/src/main/java/org/hornetq/api/core/client/ClientSessionFactory.java
  4. +44 −0 hornetq-core-client/src/main/java/org/hornetq/core/client/impl/AddressQueryImpl.java
  5. +48 −68 hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientConsumerImpl.java
  6. +6 −10 hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientConsumerInternal.java
  7. +3 −2 hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientLargeMessageImpl.java
  8. +5 −0 hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientMessageInternal.java
  9. +53 −73 hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientProducerImpl.java
  10. +423 −716 hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
  11. +5 −0 hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryInternal.java
  12. +114 −649 hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientSessionImpl.java
  13. +20 −28 hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientSessionInternal.java
  14. +0 −131 hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientSessionPacketHandler.java
  15. +2 −5 ...-core-client/src/main/java/org/hornetq/core/client/impl/CompressedLargeMessageControllerImpl.java
  16. +26 −39 hornetq-core-client/src/main/java/org/hornetq/core/client/impl/DelegatingSession.java
  17. +1 −2 hornetq-core-client/src/main/java/org/hornetq/core/client/impl/LargeMessageController.java
  18. +62 −32 hornetq-core-client/src/main/java/org/hornetq/core/client/impl/LargeMessageControllerImpl.java
  19. +99 −0 hornetq-core-client/src/main/java/org/hornetq/core/client/impl/QueueQueryImpl.java
  20. +538 −0 ...q-core-client/src/main/java/org/hornetq/core/protocol/core/impl/HornetQClientProtocolManager.java
  21. +30 −0 ...client/src/main/java/org/hornetq/core/protocol/core/impl/HornetQClientProtocolManagerFactory.java
  22. +923 −0 hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/impl/HornetQSessionContext.java
  23. +14 −0 ...rc/main/java/org/hornetq/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage.java
  24. +2 −13 .../src/main/java/org/hornetq/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java
  25. +10 −0 hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/AbstractConnector.java
  26. +73 −0 hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/ClientProtocolManager.java
  27. +23 −0 hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/ClientProtocolManagerFactory.java
  28. +4 −0 hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/Connector.java
  29. +36 −0 hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/ProtocolResponseHandler.java
  30. +269 −0 hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/SessionContext.java
  31. +1 −1 hornetq-core-client/src/main/java/org/hornetq/utils/ConfirmationWindowWarning.java
  32. +1 −1 hornetq-jms-client/src/main/java/org/hornetq/jms/client/HornetQMessageProducer.java
  33. +8 −8 hornetq-jms-client/src/main/java/org/hornetq/jms/client/HornetQSession.java
  34. +5 −0 hornetq-server/src/main/java/org/hornetq/core/remoting/impl/invm/InVMConnector.java
  35. +2 −8 hornetq-server/src/main/java/org/hornetq/core/remoting/impl/invm/InVMConnectorFactory.java
  36. +2 −1 hornetq-server/src/main/java/org/hornetq/core/server/cluster/BackupManager.java
  37. +3 −3 hornetq-server/src/main/java/org/hornetq/core/server/cluster/impl/BridgeImpl.java
  38. +1 −1 hornetq-server/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
  39. +3 −2 hornetq-server/src/test/java/org/hornetq/tests/util/ServiceTestBase.java
  40. +1 −1 hornetq-server/src/test/java/org/hornetq/tests/util/UnitTestCase.java
  41. +497 −2 tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/AcknowledgeTest.java
  42. +19 −0 tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/ConsumerTest.java
  43. +1 −1 ...integration-tests/src/test/java/org/hornetq/tests/integration/client/IncompatibleVersionTest.java
  44. +8 −8 ...gration-tests/src/test/java/org/hornetq/tests/integration/client/InterruptedLargeMessageTest.java
  45. +49 −5 ...-tests/src/test/java/org/hornetq/tests/integration/client/LargeMessageAvoidLargeMessagesTest.java
  46. +3 −3 ...ntegration-tests/src/test/java/org/hornetq/tests/integration/client/LargeMessageCompressTest.java
  47. +20 −20 tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/LargeMessageTest.java
  48. +1 −1 tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SessionCloseTest.java
  49. +3 −4 tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SessionTest.java
  50. +2 −2 ...sts/src/test/java/org/hornetq/tests/integration/cluster/failover/NettyReplicatedFailoverTest.java
  51. +3 −3 ...on-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedFailoverTest.java
  52. +5 −5 ...egration-tests/src/test/java/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java
  53. +2 −1 tests/integration-tests/src/test/java/org/hornetq/tests/integration/replication/ReplicationTest.java
  54. +15 −6 tests/jms-tests/src/test/java/org/hornetq/jms/tests/message/MessageHeaderTest.java
  55. +33 −47 tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/client/impl/LargeMessageBufferTest.java
View
8 hornetq-core-client/src/main/java/org/hornetq/api/core/client/ClientConsumer.java
@@ -32,6 +32,14 @@
*/
public interface ClientConsumer extends AutoCloseable
{
+
+ /**
+ * The server's ID associated with this consumer.
+ * HornetQ implements this as a long but this could be protocol dependent.
+ * @return
+ */
+ Object getId();
+
/**
* Receives a message from a queue.
* <p>
View
36 hornetq-core-client/src/main/java/org/hornetq/api/core/client/ClientSession.java
@@ -31,9 +31,9 @@
/**
* Information returned by a binding query
*
- * @see ClientSession#bindingQuery(SimpleString)
+ * @see ClientSession#addressQuery(SimpleString)
*/
- public interface BindingQuery
+ public interface AddressQuery
{
/**
* Returns <code>true</code> if the binding exists, <code>false</code> else.
@@ -47,6 +47,15 @@
}
/**
+ * @deprecated Use {@link org.hornetq.api.core.client.ClientSession.AddressQuery} instead
+ */
+ @Deprecated
+ public interface BindingQuery extends AddressQuery
+ {
+
+ }
+
+ /**
* Information returned by a queue query
*
* @see ClientSession#queueQuery(SimpleString)
@@ -59,6 +68,11 @@
boolean isExists();
/**
+ * Return <code>true</code> if the queue is temporary, <code>false</code> else.
+ */
+ boolean isTemporary();
+
+ /**
* Returns <code>true</code> if the queue is durable, <code>false</code> else.
*/
boolean isDurable();
@@ -82,6 +96,13 @@
* Returns the address that the queue is bound to.
*/
SimpleString getAddress();
+
+ /**
+ * Return the name of the queue
+ *
+ * @return
+ */
+ SimpleString getName();
}
// Lifecycle operations ------------------------------------------
@@ -544,10 +565,10 @@ ClientConsumer createConsumer(SimpleString queueName,
* Queries information on a binding.
*
* @param address the address of the biding to query
- * @return a BindingQuery containing information on the binding attached to the given address
+ * @return a AddressQuery containing information on the binding attached to the given address
* @throws HornetQException if an exception occurs while querying the binding
*/
- BindingQuery bindingQuery(SimpleString address) throws HornetQException;
+ AddressQuery addressQuery(SimpleString address) throws HornetQException;
// Transaction operations ----------------------------------------
@@ -639,4 +660,11 @@ ClientConsumer createConsumer(SimpleString queueName,
* @throws HornetQException
*/
void addUniqueMetaData(String key, String data) throws HornetQException;
+
+ /**
+ * Return the sessionFactory used to created this Session.
+ *
+ * @return
+ */
+ ClientSessionFactory getSessionFactory();
}
View
4 hornetq-core-client/src/main/java/org/hornetq/api/core/client/ClientSessionFactory.java
@@ -13,7 +13,7 @@
package org.hornetq.api.core.client;
import org.hornetq.api.core.HornetQException;
-import org.hornetq.core.protocol.core.CoreRemotingConnection;
+import org.hornetq.spi.core.protocol.RemotingConnection;
/**
@@ -169,5 +169,5 @@ ClientSession createSession(String username,
*
* @return the core connection
*/
- CoreRemotingConnection getConnection();
+ RemotingConnection getConnection();
}
View
44 hornetq-core-client/src/main/java/org/hornetq/core/client/impl/AddressQueryImpl.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.client.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.client.ClientSession;
+
+public class AddressQueryImpl implements ClientSession.AddressQuery, ClientSession.BindingQuery
+{
+
+ private final boolean exists;
+
+ private final ArrayList<SimpleString> queueNames;
+
+ public AddressQueryImpl(final boolean exists, final List<SimpleString> queueNames)
+ {
+ this.exists = exists;
+ this.queueNames = new ArrayList<SimpleString>(queueNames);
+ }
+
+ public List<SimpleString> getQueueNames()
+ {
+ return queueNames;
+ }
+
+ public boolean isExists()
+ {
+ return exists;
+ }
+}
View
116 hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientConsumerImpl.java
@@ -26,19 +26,13 @@
import org.hornetq.api.core.Message;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.MessageHandler;
import org.hornetq.api.core.client.ServerLocator;
-import org.hornetq.core.protocol.core.Channel;
-import org.hornetq.core.protocol.core.impl.PacketImpl;
-import org.hornetq.core.protocol.core.impl.wireformat.SessionConsumerCloseMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.SessionConsumerFlowCreditMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveLargeMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveMessage;
import org.hornetq.core.client.HornetQClientLogger;
import org.hornetq.core.client.HornetQClientMessageBundle;
+import org.hornetq.spi.core.remoting.SessionContext;
import org.hornetq.utils.FutureLatch;
import org.hornetq.utils.PriorityLinkedList;
import org.hornetq.utils.PriorityLinkedListImpl;
@@ -71,7 +65,7 @@
private final ClientSessionInternal session;
- private final Channel channel;
+ private final SessionContext sessionContext;
private final long id;
@@ -133,7 +127,7 @@
private long forceDeliveryCount;
- private final SessionQueueQueryResponseMessage queueInfo;
+ private final ClientSession.QueueQuery queueInfo;
private volatile boolean ackIndividually;
@@ -152,8 +146,8 @@ public ClientConsumerImpl(final ClientSessionInternal session,
final TokenBucketLimiter rateLimiter,
final Executor executor,
final Executor flowControlExecutor,
- final Channel channel,
- final SessionQueueQueryResponseMessage queueInfo,
+ final SessionContext sessionContext,
+ final ClientSession.QueueQuery queueInfo,
final ClassLoader contextClassLoader)
{
this.id = id;
@@ -164,7 +158,7 @@ public ClientConsumerImpl(final ClientSessionInternal session,
this.browseOnly = browseOnly;
- this.channel = channel;
+ this.sessionContext = sessionContext;
this.session = session;
@@ -186,6 +180,11 @@ public ClientConsumerImpl(final ClientSessionInternal session,
// ClientConsumer implementation
// -----------------------------------------------------------------
+ public Object getId()
+ {
+ return id;
+ }
+
private ClientMessage receive(final long timeout, final boolean forcingDelivery) throws HornetQException
{
checkClosed();
@@ -298,7 +297,7 @@ private ClientMessage receive(final long timeout, final boolean forcingDelivery)
HornetQClientLogger.LOGGER.trace("Forcing delivery");
}
// JBPAPP-6030 - Calling forceDelivery outside of the lock to avoid distributed dead locks
- session.forceDelivery(id, forceDeliveryCount++);
+ sessionContext.forceDelivery(this, forceDeliveryCount++);
callForceDelivery = false;
deliveryForced = true;
continue;
@@ -345,7 +344,7 @@ private ClientMessage receive(final long timeout, final boolean forcingDelivery)
{
m.discardBody();
- session.expire(id, m.getMessageID());
+ session.expire(this, m);
if (clientWindowSize == 0)
{
@@ -459,6 +458,7 @@ public void close() throws HornetQException
/**
* To be used by MDBs
+ *
* @throws HornetQException
*/
public void interruptHandlers() throws HornetQException
@@ -553,7 +553,7 @@ public Exception getLastException()
// ClientConsumerInternal implementation
// --------------------------------------------------------------
- public SessionQueueQueryResponseMessage getQueueInfo()
+ public ClientSession.QueueQuery getQueueInfo()
{
return queueInfo;
}
@@ -578,7 +578,7 @@ public boolean isBrowseOnly()
return browseOnly;
}
- public synchronized void handleMessage(final SessionReceiveMessage message) throws Exception
+ public synchronized void handleMessage(final ClientMessageInternal message) throws Exception
{
if (closing)
{
@@ -586,25 +586,16 @@ public synchronized void handleMessage(final SessionReceiveMessage message) thro
return;
}
- if (message.getMessage().getBooleanProperty(Message.HDR_LARGE_COMPRESSED))
+ if (message.getBooleanProperty(Message.HDR_LARGE_COMPRESSED))
{
handleCompressedMessage(message);
}
else
{
- handleRegularMessage((ClientMessageInternal)message.getMessage(), message);
+ handleRegularMessage(message);
}
}
- private void handleRegularMessage(final ClientMessageInternal message, final SessionReceiveMessage messagePacket) throws Exception
- {
- message.setDeliveryCount(messagePacket.getDeliveryCount());
-
- message.setFlowControlSize(messagePacket.getPacketSize());
-
- handleRegularMessage(message);
- }
-
private void handleRegularMessage(ClientMessageInternal message)
{
if (message.getAddress() == null)
@@ -644,17 +635,15 @@ private void handleRegularMessage(ClientMessageInternal message)
* Such messages come from message senders who are configured to compress large messages, and
* if some of the messages are compressed below the min-large-message-size limit, they are sent
* as regular messages.
- *
+ * <p/>
* However when decompressing the message, we are not sure how large the message could be..
* for that reason we fake a large message controller that will deal with the message as it was a large message
- *
+ * <p/>
* Say that you sent a 1G message full of spaces. That could be just bellow 100K compressed but you wouldn't have
* enough memory to decompress it
*/
- private void handleCompressedMessage(final SessionReceiveMessage message) throws Exception
+ private void handleCompressedMessage(final ClientMessageInternal clMessage) throws Exception
{
- ClientMessageImpl clMessage = (ClientMessageImpl) message.getMessage();
- //create a ClientLargeMessageInternal out of the message
ClientLargeMessageImpl largeMessage = new ClientLargeMessageImpl();
largeMessage.retrieveExistingData(clMessage);
@@ -680,13 +669,12 @@ private void handleCompressedMessage(final SessionReceiveMessage message) throws
final byte[] body = qbuff.readBytes(bytesToRead).toByteBuffer().array();
largeMessage.setLargeMessageController(new CompressedLargeMessageControllerImpl(currentLargeMessageController));
- SessionReceiveContinuationMessage packet = new SessionReceiveContinuationMessage(this.getID(), body, false, false, body.length);
- currentLargeMessageController.addPacket(packet);
+ currentLargeMessageController.addPacket(body, body.length, false);
- handleRegularMessage(largeMessage, message);
+ handleRegularMessage(largeMessage);
}
- public synchronized void handleLargeMessage(final SessionReceiveLargeMessage packet) throws Exception
+ public synchronized void handleLargeMessage(final ClientLargeMessageInternal clientLargeMessage, long largeMessageSize) throws Exception
{
if (closing)
{
@@ -695,17 +683,11 @@ public synchronized void handleLargeMessage(final SessionReceiveLargeMessage pac
}
// Flow control for the first packet, we will have others
- ClientLargeMessageInternal currentChunkMessage = (ClientLargeMessageInternal)packet.getLargeMessage();
-
- currentChunkMessage.setFlowControlSize(packet.getPacketSize());
-
- currentChunkMessage.setDeliveryCount(packet.getDeliveryCount());
-
File largeMessageCache = null;
if (session.isCacheLargeMessageClient())
{
- largeMessageCache = File.createTempFile("tmp-large-message-" + currentChunkMessage.getMessageID() + "-",
+ largeMessageCache = File.createTempFile("tmp-large-message-" + clientLargeMessage.getMessageID() + "-",
".tmp");
largeMessageCache.deleteOnExit();
}
@@ -714,21 +696,21 @@ public synchronized void handleLargeMessage(final SessionReceiveLargeMessage pac
ServerLocator locator = sf.getServerLocator();
long callTimeout = locator.getCallTimeout();
- currentLargeMessageController = new LargeMessageControllerImpl(this, packet.getLargeMessageSize(), callTimeout, largeMessageCache);
+ currentLargeMessageController = new LargeMessageControllerImpl(this, largeMessageSize, callTimeout, largeMessageCache);
- if (currentChunkMessage.isCompressed())
+ if (clientLargeMessage.isCompressed())
{
- currentChunkMessage.setLargeMessageController(new CompressedLargeMessageControllerImpl(currentLargeMessageController));
+ clientLargeMessage.setLargeMessageController(new CompressedLargeMessageControllerImpl(currentLargeMessageController));
}
else
{
- currentChunkMessage.setLargeMessageController(currentLargeMessageController);
+ clientLargeMessage.setLargeMessageController(currentLargeMessageController);
}
- handleRegularMessage(currentChunkMessage);
+ handleRegularMessage(clientLargeMessage);
}
- public synchronized void handleLargeMessageContinuation(final SessionReceiveContinuationMessage chunk) throws Exception
+ public synchronized void handleLargeMessageContinuation(final byte[] chunk, final int flowControlSize, final boolean isContinues) throws Exception
{
if (closing)
{
@@ -738,13 +720,13 @@ public synchronized void handleLargeMessageContinuation(final SessionReceiveCont
{
if (isTrace)
{
- HornetQClientLogger.LOGGER.trace("Sending back credits for largeController = null " + chunk.getPacketSize());
+ HornetQClientLogger.LOGGER.trace("Sending back credits for largeController = null " + flowControlSize);
}
- flowControl(chunk.getPacketSize(), false);
+ flowControl(flowControlSize, false);
}
else
{
- currentLargeMessageController.addPacket(chunk);
+ currentLargeMessageController.addPacket(chunk, flowControlSize, isContinues);
}
}
@@ -764,7 +746,7 @@ public void clear(boolean waitForOnMessage) throws HornetQException
if (message.isLargeMessage())
{
- ClientLargeMessageInternal largeMessage = (ClientLargeMessageInternal)message;
+ ClientLargeMessageInternal largeMessage = (ClientLargeMessageInternal) message;
largeMessage.getLargeMessageController().cancel();
}
@@ -817,7 +799,7 @@ public int getBufferSize()
public void acknowledge(final ClientMessage message) throws HornetQException
{
- ClientMessageInternal cmi = (ClientMessageInternal)message;
+ ClientMessageInternal cmi = (ClientMessageInternal) message;
if (ackIndividually)
{
@@ -845,7 +827,7 @@ public void individualAcknowledge(ClientMessage message) throws HornetQException
flushAcks();
}
- session.individualAcknowledge(id, message.getMessageID());
+ session.individualAcknowledge(this, message);
}
public void flushAcks() throws HornetQException
@@ -857,12 +839,11 @@ public void flushAcks() throws HornetQException
}
/**
- *
- * LargeMessageBuffer will call flowcontrol here, while other handleMessage will also be calling flowControl.
- * So, this operation needs to be atomic.
- *
- * @param discountSlowConsumer When dealing with slowConsumers, we need to discount one credit that was pre-sent when the first receive was called. For largeMessage that is only done at the latest packet
- */
+ * LargeMessageBuffer will call flowcontrol here, while other handleMessage will also be calling flowControl.
+ * So, this operation needs to be atomic.
+ *
+ * @param discountSlowConsumer When dealing with slowConsumers, we need to discount one credit that was pre-sent when the first receive was called. For largeMessage that is only done at the latest packet
+ */
public void flowControl(final int messageBytes, final boolean discountSlowConsumer) throws HornetQException
{
if (clientWindowSize >= 0)
@@ -923,7 +904,7 @@ public void flowControl(final int messageBytes, final boolean discountSlowConsum
/**
* Sending a initial credit for slow consumers
- * */
+ */
private void startSlowConsumer()
{
if (isTrace)
@@ -1002,7 +983,7 @@ public void run()
{
try
{
- channel.send(new SessionConsumerFlowCreditMessage(id, credits));
+ sessionContext.sendConsumerCredits(ClientConsumerImpl.this, credits);
}
finally
{
@@ -1087,7 +1068,6 @@ private void callOnMessage() throws Exception
}
-
boolean expired = message.isExpired();
flowControlBeforeConsumption(message);
@@ -1148,7 +1128,7 @@ public Object run()
}
else
{
- session.expire(id, message.getMessageID());
+ session.expire(this, message);
}
// If slow consumer, we need to send 1 credit to make sure we get another message
@@ -1214,7 +1194,7 @@ private void doCleanUp(final boolean sendCloseMessage) throws HornetQException
if (sendCloseMessage)
{
- channel.sendBlocking(new SessionConsumerCloseMessage(id), PacketImpl.NULL_RESPONSE);
+ sessionContext.closeConsumer(this);
}
}
catch (Throwable t)
@@ -1236,7 +1216,7 @@ private void doAck(final ClientMessageInternal message) throws HornetQException
lastAckedMessage = null;
- session.acknowledge(id, message.getMessageID());
+ session.acknowledge(this, message);
}
// Inner classes
View
16 hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientConsumerInternal.java
@@ -16,17 +16,12 @@
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.client.ClientConsumer;
import org.hornetq.api.core.client.ClientMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveLargeMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveMessage;
+import org.hornetq.api.core.client.ClientSession;
/**
- *
* A ClientConsumerInternal
*
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
- *
*/
public interface ClientConsumerInternal extends ClientConsumer
{
@@ -38,11 +33,11 @@
boolean isBrowseOnly();
- void handleMessage(SessionReceiveMessage message) throws Exception;
+ void handleMessage(ClientMessageInternal message) throws Exception;
- void handleLargeMessage(SessionReceiveLargeMessage largeMessageHeader) throws Exception;
+ void handleLargeMessage(ClientLargeMessageInternal clientLargeMessage, long largeMessageSize) throws Exception;
- void handleLargeMessageContinuation(SessionReceiveContinuationMessage continuation) throws Exception;
+ void handleLargeMessageContinuation(byte[] chunk, int flowControlSize, boolean isContinues) throws Exception;
void flowControl(final int messageBytes, final boolean discountSlowConsumer) throws HornetQException;
@@ -50,6 +45,7 @@
/**
* To be called by things like MDBs during shutdown of the server
+ *
* @throws HornetQException
*/
void interruptHandlers() throws HornetQException;
@@ -72,5 +68,5 @@
void start();
- SessionQueueQueryResponseMessage getQueueInfo();
+ ClientSession.QueueQuery getQueueInfo();
}
View
5 hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientLargeMessageImpl.java
@@ -205,12 +205,13 @@ public void write(int b) throws IOException
}
}
- public void retrieveExistingData(ClientMessageImpl clMessage)
+ public void retrieveExistingData(ClientMessageInternal clMessage)
{
this.messageID = clMessage.getMessageID();
this.address = clMessage.getAddress();
this.setUserID(clMessage.getUserID());
-
+ this.setFlowControlSize(clMessage.getFlowControlSize());
+ this.setDeliveryCount(clMessage.getDeliveryCount());
this.type = clMessage.getType();
this.durable = clMessage.isDurable();
this.setExpiration(clMessage.getExpiration());
View
5 hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientMessageInternal.java
@@ -10,10 +10,12 @@
* implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
+
package org.hornetq.core.client.impl;
import org.hornetq.api.core.client.ClientMessage;
import org.hornetq.core.message.impl.MessageInternal;
+import org.hornetq.utils.TypedProperties;
/**
* A ClientMessageInternal
@@ -22,6 +24,9 @@
*/
public interface ClientMessageInternal extends ClientMessage, MessageInternal
{
+
+ TypedProperties getProperties();
+
/** Size used for FlowControl */
int getFlowControlSize();
View
126 hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientProducerImpl.java
@@ -26,11 +26,8 @@
import org.hornetq.core.client.HornetQClientMessageBundle;
import org.hornetq.core.message.BodyEncoder;
import org.hornetq.core.message.impl.MessageInternal;
-import org.hornetq.core.protocol.core.Channel;
-import org.hornetq.core.protocol.core.impl.PacketImpl;
import org.hornetq.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.SessionSendLargeMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.SessionSendMessage;
+import org.hornetq.spi.core.remoting.SessionContext;
import org.hornetq.utils.DeflaterReader;
import org.hornetq.utils.HornetQBufferInputStream;
import org.hornetq.utils.TokenBucketLimiter;
@@ -49,7 +46,7 @@
private final ClientSessionInternal session;
- private final Channel channel;
+ private final SessionContext sessionContext;
private volatile boolean closed;
@@ -79,9 +76,9 @@ public ClientProducerImpl(final ClientSessionInternal session,
final boolean autoGroup,
final SimpleString groupID,
final int minLargeMessageSize,
- final Channel channel)
+ final SessionContext sessionContext)
{
- this.channel = channel;
+ this.sessionContext = sessionContext;
this.session = session;
@@ -239,8 +236,8 @@ private void doSend(final SimpleString address1, final Message msg, final SendAc
// a note about the second check on the writerIndexSize,
// If it's a server's message, it means this is being done through the bridge or some special consumer on the
// server's on which case we can't' convert the message into large at the servers
- if (msgI.getBodyInputStream() != null || msgI.isLargeMessage() ||
- msgI.getBodyBuffer().writerIndex() > minLargeMessageSize && !msgI.isServerMessage())
+ if (sessionContext.supportsLargeMessage() && (msgI.getBodyInputStream() != null || msgI.isLargeMessage() ||
+ msgI.getBodyBuffer().writerIndex() > minLargeMessageSize && !msgI.isServerMessage()))
{
isLarge = true;
}
@@ -320,23 +317,16 @@ private void sendRegularMessage(final MessageInternal msgI, final boolean sendBl
// Not the continuations, but this is ok since we are only interested in limiting the amount of
// data in *memory* and continuations go straight to the disk
- theCredits.acquireCredits(msgI.getEncodeSize());
+ int creditSize = sessionContext.getCreditsOnSendingFull(msgI);
+
+ theCredits.acquireCredits(creditSize);
}
catch (InterruptedException e)
{
throw new HornetQInterruptedException(e);
}
- SessionSendMessage packet = new SessionSendMessage(msgI, sendBlocking, handler);
-
- if (sendBlocking)
- {
- channel.sendBlocking(packet, PacketImpl.NULL_RESPONSE);
- }
- else
- {
- channel.sendBatched(packet);
- }
+ sessionContext.sendFullMessage(msgI, sendBlocking, handler);
}
private void checkClosed() throws HornetQException
@@ -354,9 +344,8 @@ private void checkClosed() throws HornetQException
* @param handler
* @throws HornetQException
*/
- private void
- largeMessageSend(final boolean sendBlocking, final MessageInternal msgI,
- final ClientProducerCredits credits, SendAcknowledgementHandler handler) throws HornetQException
+ private void largeMessageSend(final boolean sendBlocking, final MessageInternal msgI,
+ final ClientProducerCredits credits, SendAcknowledgementHandler handler) throws HornetQException
{
int headerSize = msgI.getHeadersAndPropertiesEncodeSize();
@@ -387,16 +376,16 @@ else if ((input = msgI.getBodyInputStream()) != null)
}
}
- private void
- sendInitialLargeMessageHeader(MessageInternal msgI, ClientProducerCredits credits) throws HornetQException
+ private void sendInitialLargeMessageHeader(MessageInternal msgI, ClientProducerCredits credits) throws HornetQException
{
- SessionSendLargeMessage initialChunk = new SessionSendLargeMessage(msgI);
-
- channel.send(initialChunk);
+ int creditsUsed = sessionContext.sendInitialChunkOnLargeMessage(msgI);
+ // On the case of large messages we tried to send credits before but we would starve otherwise
+ // we may find a way to improve the logic and always acquire the credits before
+ // but that's the way it's been tested and been working ATM
try
{
- credits.acquireCredits(msgI.getHeadersAndPropertiesEncodeSize());
+ credits.acquireCredits(creditsUsed);
}
catch (InterruptedException e)
{
@@ -413,9 +402,8 @@ else if ((input = msgI.getBodyInputStream()) != null)
* @param handler
* @throws HornetQException
*/
- private void
- largeMessageSendServer(final boolean sendBlocking, final MessageInternal msgI,
- final ClientProducerCredits credits, SendAcknowledgementHandler handler) throws HornetQException
+ private void largeMessageSendServer(final boolean sendBlocking, final MessageInternal msgI,
+ final ClientProducerCredits credits, SendAcknowledgementHandler handler) throws HornetQException
{
sendInitialLargeMessageHeader(msgI, credits);
@@ -440,25 +428,13 @@ else if ((input = msgI.getBodyInputStream()) != null)
pos += chunkLength;
lastChunk = pos >= bodySize;
- final boolean requiresResponse = lastChunk && sendBlocking;
SendAcknowledgementHandler messageHandler = lastChunk ? handler : null;
- final SessionSendContinuationMessage chunk =
- new SessionSendContinuationMessage(msgI, bodyBuffer.toByteBuffer().array(), !lastChunk,
- requiresResponse, messageHandler);
- if (requiresResponse)
- {
- // When sending it blocking, only the last chunk will be blocking.
- channel.sendBlocking(chunk, PacketImpl.NULL_RESPONSE);
- }
- else
- {
- channel.send(chunk);
- }
+ int creditsUsed = sessionContext.sendLargeMessageChunk(msgI, -1, sendBlocking, lastChunk, bodyBuffer.toByteBuffer().array(), messageHandler);
try
{
- credits.acquireCredits(chunk.getPacketSize());
+ credits.acquireCredits(creditsUsed);
}
catch (InterruptedException e)
{
@@ -580,38 +556,42 @@ private void largeMessageSendStreamed(final boolean sendBlocking, final MessageI
sendRegularMessage(msgI, sendBlocking, credits, handler);
return;
}
-
- chunk = new SessionSendContinuationMessage(msgI, buff, false, sendBlocking, messageSize.get(), handler);
+ else
+ {
+ if (!headerSent)
+ {
+ headerSent = true;
+ sendInitialLargeMessageHeader(msgI, credits);
+ }
+ int creditsSent = sessionContext.sendLargeMessageChunk(msgI, messageSize.get(), sendBlocking, true, buff, handler);
+ try
+ {
+ credits.acquireCredits(creditsSent);
+ }
+ catch (InterruptedException e)
+ {
+ throw new HornetQInterruptedException(e);
+ }
+ }
}
else
{
- chunk = new SessionSendContinuationMessage(msgI, buff, true, false, null);
- }
-
- if (!headerSent)
- {
- sendInitialLargeMessageHeader(msgI, credits);
- headerSent = true;
- }
-
+ if (!headerSent)
+ {
+ headerSent = true;
+ sendInitialLargeMessageHeader(msgI, credits);
+ }
- if (sendBlocking && lastPacket)
- {
- // When sending it blocking, only the last chunk will be blocking.
- channel.sendBlocking(chunk, PacketImpl.NULL_RESPONSE);
- }
- else
- {
- channel.send(chunk);
- }
- try
- {
- credits.acquireCredits(chunk.getPacketSize());
- }
- catch (InterruptedException e)
- {
- throw new HornetQInterruptedException(e);
+ int creditsSent = sessionContext.sendLargeMessageChunk(msgI, messageSize.get(), sendBlocking, false, buff, handler);
+ try
+ {
+ credits.acquireCredits(creditsSent);
+ }
+ catch (InterruptedException e)
+ {
+ throw new HornetQInterruptedException(e);
+ }
}
}
View
1,139 hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
423 additions, 716 deletions not shown because the diff is too large. Please use a local Git client to view these changes.
View
5 ...-core-client/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryInternal.java
@@ -12,6 +12,8 @@
*/
package org.hornetq.core.client.impl;
+import java.util.concurrent.locks.Lock;
+
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientSessionFactory;
@@ -57,4 +59,7 @@
void setReconnectAttempts(int i);
ConfirmationWindowWarning getConfirmationWindowWarning();
+
+
+ Lock lockFailover();
}
View
763 hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientSessionImpl.java
@@ -15,13 +15,9 @@
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
-import java.security.AccessController;
-import java.security.PrivilegedAction;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
-import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
@@ -36,62 +32,16 @@
import org.hornetq.api.core.client.ClientConsumer;
import org.hornetq.api.core.client.ClientMessage;
import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.FailoverEventListener;
import org.hornetq.api.core.client.SendAcknowledgementHandler;
import org.hornetq.api.core.client.SessionFailureListener;
import org.hornetq.core.client.HornetQClientLogger;
import org.hornetq.core.client.HornetQClientMessageBundle;
-import org.hornetq.core.protocol.core.Channel;
-import org.hornetq.core.protocol.core.CommandConfirmationHandler;
-import org.hornetq.core.protocol.core.CoreRemotingConnection;
-import org.hornetq.core.protocol.core.Packet;
-import org.hornetq.core.protocol.core.impl.PacketImpl;
-import org.hornetq.core.protocol.core.impl.wireformat.CreateQueueMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.CreateSharedQueueMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.ReattachSessionMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.ReattachSessionResponseMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.RollbackMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.SessionAcknowledgeMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.SessionAddMetaDataMessageV2;
-import org.hornetq.core.protocol.core.impl.wireformat.SessionBindingQueryMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.SessionCloseMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.SessionConsumerFlowCreditMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.SessionCreateConsumerMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.SessionDeleteQueueMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.SessionExpireMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.SessionForceConsumerDelivery;
-import org.hornetq.core.protocol.core.impl.wireformat.SessionIndividualAcknowledgeMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.SessionQueueQueryMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveLargeMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.SessionRequestProducerCreditsMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.SessionSendMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.SessionUniqueAddMetaDataMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.SessionXAAfterFailedMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.SessionXACommitMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.SessionXAEndMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.SessionXAForgetMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.SessionXAGetInDoubtXidsResponseMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.SessionXAGetTimeoutResponseMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.SessionXAJoinMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.SessionXAPrepareMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.SessionXAResponseMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.SessionXAResumeMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.SessionXARollbackMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.SessionXASetTimeoutMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.SessionXASetTimeoutResponseMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.SessionXAStartMessage;
import org.hornetq.core.remoting.FailureListener;
import org.hornetq.spi.core.protocol.RemotingConnection;
-import org.hornetq.spi.core.remoting.Connection;
+import org.hornetq.spi.core.remoting.SessionContext;
import org.hornetq.utils.ConfirmationWindowWarning;
-import org.hornetq.utils.IDGenerator;
-import org.hornetq.utils.SimpleIDGenerator;
import org.hornetq.utils.TokenBucketLimiterImpl;
import org.hornetq.utils.XidCodecSupport;
@@ -102,7 +52,7 @@
* @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
* @author <a href="mailto:ataylor@redhat.com">Andy Taylor</a>
*/
-public final class ClientSessionImpl implements ClientSessionInternal, FailureListener, CommandConfirmationHandler
+public final class ClientSessionImpl implements ClientSessionInternal, FailureListener
{
private final Map<String, String> metadata = new HashMap<String, String>();
@@ -121,8 +71,6 @@
// to be sent to consumers as consumers will need a separate consumer for flow control
private final Executor flowControlExecutor;
- private volatile CoreRemotingConnection remotingConnection;
-
/**
* All access to producers are guarded (i.e. synchronized) on itself.
*/
@@ -165,21 +113,15 @@
private final boolean cacheLargeMessageClient;
- private final Channel channel;
-
- private final int version;
+ private final SessionContext sessionContext;
// For testing only
private boolean forceNotSameRM;
- private final IDGenerator idGenerator = new SimpleIDGenerator(0);
-
private final ClientProducerCreditManager producerCreditManager;
private volatile boolean started;
- private SendAcknowledgementHandler sendAckHandler;
-
private volatile boolean rollbackOnly;
private volatile boolean workDone;
@@ -192,8 +134,6 @@
private volatile SimpleString defaultAddress;
- private boolean xaRetry = false;
-
/**
* Current XID. this will be used in case of failover
*/
@@ -227,9 +167,7 @@
final boolean compressLargeMessages,
final int initialMessagePacketSize,
final String groupID,
- final CoreRemotingConnection remotingConnection,
- final int version,
- final Channel channel,
+ final SessionContext sessionContext,
final Executor executor,
final Executor flowControlExecutor) throws HornetQException
{
@@ -241,8 +179,6 @@
this.password = password;
- this.remotingConnection = remotingConnection;
-
this.executor = executor;
this.flowControlExecutor = flowControlExecutor;
@@ -259,10 +195,6 @@
this.autoGroup = autoGroup;
- this.channel = channel;
-
- this.version = version;
-
this.ackBatchSize = ackBatchSize;
this.consumerWindowSize = consumerWindowSize;
@@ -288,10 +220,8 @@
this.groupID = groupID;
producerCreditManager = new ClientProducerCreditManagerImpl(this, producerWindowSize);
- if (confirmationWindowSize >= 0)
- {
- this.channel.setCommandConfirmationHandler(this);
- }
+
+ this.sessionContext = sessionContext;
confirmationWindowWarning = sessionFactory.getConfirmationWindowWarning();
}
@@ -299,11 +229,6 @@
// ClientSession implementation
// -----------------------------------------------------------------
- public Channel getChannel()
- {
- return channel;
- }
-
public void createQueue(final SimpleString address, final SimpleString queueName) throws HornetQException
{
internalCreateQueue(address, queueName, null, false, false);
@@ -335,12 +260,10 @@ public void createSharedQueue(SimpleString address,
checkClosed();
- CreateSharedQueueMessage request = new CreateSharedQueueMessage(address, queueName, filterString, durable, true);
-
startCall();
try
{
- channel.sendBlocking(request, PacketImpl.NULL_RESPONSE);
+ sessionContext.createSharedQueue(address, queueName, filterString, durable);
}
finally
{
@@ -401,7 +324,7 @@ public void deleteQueue(final SimpleString queueName) throws HornetQException
startCall();
try
{
- channel.sendBlocking(new SessionDeleteQueueMessage(queueName), PacketImpl.NULL_RESPONSE);
+ sessionContext.deleteQueue(queueName);
}
finally
{
@@ -418,20 +341,11 @@ public QueueQuery queueQuery(final SimpleString queueName) throws HornetQExcepti
{
checkClosed();
- SessionQueueQueryMessage request = new SessionQueueQueryMessage(queueName);
-
startCall();
try
{
- SessionQueueQueryResponseMessage response = (SessionQueueQueryResponseMessage) channel.sendBlocking(request, PacketImpl.SESS_QUEUEQUERY_RESP);
-
- return new QueueQueryImpl(response.isDurable(),
- response.getConsumerCount(),
- response.getMessageCount(),
- response.getFilterString(),
- response.getAddress(),
- response.isExists());
+ return sessionContext.queueQuery(queueName);
}
finally
{
@@ -440,23 +354,25 @@ public QueueQuery queueQuery(final SimpleString queueName) throws HornetQExcepti
}
+ /**
+ * Use {@link #addressQuery(org.hornetq.api.core.SimpleString)} instead
+ *
+ * @param address
+ * @return
+ * @throws HornetQException
+ */
+ @Deprecated
public BindingQuery bindingQuery(final SimpleString address) throws HornetQException
{
- checkClosed();
-
- SessionBindingQueryMessage request = new SessionBindingQueryMessage(address);
-
- SessionBindingQueryResponseMessage response = (SessionBindingQueryResponseMessage) channel.sendBlocking(request, PacketImpl.SESS_BINDINGQUERY_RESP);
-
- return new BindingQueryImpl(response.isExists(), response.getQueueNames());
+ return (BindingQuery) addressQuery(address);
}
- public void forceDelivery(final long consumerID, final long sequence) throws HornetQException
+ public AddressQuery addressQuery(final SimpleString address) throws HornetQException
{
checkClosed();
- SessionForceConsumerDelivery request = new SessionForceConsumerDelivery(consumerID, sequence);
- channel.send(request);
+
+ return sessionContext.addressQuery(address);
}
public ClientConsumer createConsumer(final SimpleString queueName) throws HornetQException
@@ -606,7 +522,7 @@ public void commit() throws HornetQException
}
try
{
- channel.sendBlocking(new PacketImpl(PacketImpl.SESS_COMMIT), PacketImpl.NULL_RESPONSE);
+ sessionContext.simpleCommit();
}
catch (HornetQException e)
{
@@ -673,7 +589,7 @@ public void rollback(final boolean isLastMessageAsDelivered) throws HornetQExcep
// Acks must be flushed here *after connection is stopped and all onmessages finished executing
flushAcks();
- channel.sendBlocking(new RollbackMessage(isLastMessageAsDelivered), PacketImpl.NULL_RESPONSE);
+ sessionContext.simpleRollback(isLastMessageAsDelivered);
if (wasStarted)
{
@@ -747,7 +663,7 @@ public void start() throws HornetQException
clientConsumerInternal.start();
}
- channel.send(new PacketImpl(PacketImpl.SESS_START));
+ sessionContext.sessionStart();
started = true;
}
@@ -769,7 +685,7 @@ public void stop(final boolean waitForOnMessage) throws HornetQException
clientConsumerInternal.stop(waitForOnMessage);
}
- channel.sendBlocking(new PacketImpl(PacketImpl.SESS_STOP), PacketImpl.NULL_RESPONSE);
+ sessionContext.sessionStop();
started = false;
}
@@ -797,7 +713,12 @@ public boolean removeFailoverListener(FailoverEventListener listener)
public int getVersion()
{
- return version;
+ return sessionContext.getServerVersion();
+ }
+
+ public boolean isClosing()
+ {
+ return inClose;
}
// ClientSessionInternal implementation
@@ -829,7 +750,7 @@ public String getName()
/**
* Acknowledges all messages received by the consumer so far.
*/
- public void acknowledge(final long consumerID, final long messageID) throws HornetQException
+ public void acknowledge(final ClientConsumer consumer, final Message message) throws HornetQException
{
// if we're pre-acknowledging then we don't need to do anything
if (preAcknowledge)
@@ -840,21 +761,13 @@ public void acknowledge(final long consumerID, final long messageID) throws Horn
checkClosed();
if (HornetQClientLogger.LOGGER.isDebugEnabled())
{
- HornetQClientLogger.LOGGER.debug("client ack messageID = " + messageID);
+ HornetQClientLogger.LOGGER.debug("client ack messageID = " + message.getMessageID());
}
- SessionAcknowledgeMessage message = new SessionAcknowledgeMessage(consumerID, messageID, blockOnAcknowledge);
startCall();
try
{
- if (blockOnAcknowledge)
- {
- channel.sendBlocking(message, PacketImpl.NULL_RESPONSE);
- }
- else
- {
- channel.sendBatched(message);
- }
+ sessionContext.sendACK(false, blockOnAcknowledge, consumer, message);
}
finally
{
@@ -862,7 +775,7 @@ public void acknowledge(final long consumerID, final long messageID) throws Horn
}
}
- public void individualAcknowledge(final long consumerID, final long messageID) throws HornetQException
+ public void individualAcknowledge(final ClientConsumer consumer, final Message message) throws HornetQException
{
// if we're pre-acknowledging then we don't need to do anything
if (preAcknowledge)
@@ -872,21 +785,11 @@ public void individualAcknowledge(final long consumerID, final long messageID) t
checkClosed();
- SessionIndividualAcknowledgeMessage message = new SessionIndividualAcknowledgeMessage(consumerID,
- messageID,
- blockOnAcknowledge);
-
startCall();
try
{
- if (blockOnAcknowledge)
- {
- channel.sendBlocking(message, PacketImpl.NULL_RESPONSE);
- }
- else
- {
- channel.sendBatched(message);
- }
+
+ sessionContext.sendACK(true, blockOnAcknowledge, consumer, message);
}
finally
{
@@ -894,16 +797,14 @@ public void individualAcknowledge(final long consumerID, final long messageID) t
}
}
- public void expire(final long consumerID, final long messageID) throws HornetQException
+ public void expire(final ClientConsumer consumer, final Message message) throws HornetQException
{
checkClosed();
// We don't send expiries for pre-ack since message will already have been acked on server
if (!preAcknowledge)
{
- SessionExpireMessage message = new SessionExpireMessage(consumerID, messageID);
-
- channel.send(message);
+ sessionContext.expireMessage(consumer, message);
}
}
@@ -939,39 +840,33 @@ public void removeProducer(final ClientProducerInternal producer)
}
}
- public void handleReceiveMessage(final long consumerID, final SessionReceiveMessage message) throws Exception
+ public void handleReceiveMessage(final long consumerID, final ClientMessageInternal message) throws Exception
{
ClientConsumerInternal consumer = getConsumer(consumerID);
if (consumer != null)
{
- ClientMessageInternal clMessage = (ClientMessageInternal) message.getMessage();
-
- clMessage.setDeliveryCount(message.getDeliveryCount());
-
- clMessage.setFlowControlSize(message.getPacketSize());
-
consumer.handleMessage(message);
}
}
- public void handleReceiveLargeMessage(final long consumerID, final SessionReceiveLargeMessage message) throws Exception
+ public void handleReceiveLargeMessage(final long consumerID, ClientLargeMessageInternal clientLargeMessage, long largeMessageSize) throws Exception
{
ClientConsumerInternal consumer = getConsumer(consumerID);
if (consumer != null)
{
- consumer.handleLargeMessage(message);
+ consumer.handleLargeMessage(clientLargeMessage, largeMessageSize);
}
}
- public void handleReceiveContinuation(final long consumerID, final SessionReceiveContinuationMessage continuation) throws Exception
+ public void handleReceiveContinuation(final long consumerID, byte[] chunk, int flowControlSize, boolean isContinues) throws Exception
{
ClientConsumerInternal consumer = getConsumer(consumerID);
if (consumer != null)
{
- consumer.handleLargeMessageContinuation(continuation);
+ consumer.handleLargeMessageContinuation(chunk, flowControlSize, isContinues);
}
}
@@ -1022,7 +917,7 @@ public void close() throws HornetQException
producerCreditManager.close();
}
inClose = true;
- channel.sendBlocking(new SessionCloseMessage(), PacketImpl.NULL_RESPONSE);
+ sessionContext.sessionClose();
}
catch (Throwable e)
{
@@ -1051,22 +946,20 @@ public synchronized void cleanUp(boolean failingOver) throws HornetQException
public void setSendAcknowledgementHandler(final SendAcknowledgementHandler handler)
{
- channel.setCommandConfirmationHandler(this);
-
- sendAckHandler = handler;
+ sessionContext.setSendAcknowledgementHandler(handler);
}
- public void preHandleFailover(CoreRemotingConnection connection)
+ public void preHandleFailover(RemotingConnection connection)
{
// We lock the channel to prevent any packets to be added to the re-send
// cache during the failover process
//we also do this before the connection fails over to give the session a chance to block for failover
- channel.lock();
+ sessionContext.lockCommunications();
}
// Needs to be synchronized to prevent issues with occurring concurrently with close()
- public void handleFailover(final CoreRemotingConnection backupConnection)
+ public void handleFailover(final RemotingConnection backupConnection)
{
synchronized (this)
{
@@ -1079,31 +972,12 @@ public void handleFailover(final CoreRemotingConnection backupConnection)
try
{
- channel.transferConnection(backupConnection);
-
- backupConnection.syncIDGeneratorSequence(remotingConnection.getIDGeneratorSequence());
-
- remotingConnection = backupConnection;
-
- int lcid = channel.getLastConfirmedCommandID();
-
- Packet request = new ReattachSessionMessage(name, lcid);
-
- Channel channel1 = backupConnection.getChannel(1, -1);
- ReattachSessionResponseMessage response = (ReattachSessionResponseMessage) channel1.sendBlocking(request, PacketImpl.REATTACH_SESSION_RESP);
+ // TODO remove this and encapsulate it
- if (response.isReattached())
- {
- if (HornetQClientLogger.LOGGER.isDebugEnabled())
- {
- HornetQClientLogger.LOGGER.debug("ClientSession reattached fine, replaying commands");
- }
- // The session was found on the server - we reattached transparently ok
+ boolean reattached = sessionContext.reattachOnNewConnection(backupConnection);
- channel.replayCommands(response.getLastConfirmedCommandID());
- }
- else
+ if (!reattached)
{
if (HornetQClientLogger.LOGGER.isDebugEnabled())
@@ -1130,93 +1004,16 @@ public void handleFailover(final CoreRemotingConnection backupConnection)
// to recreate the session, we just want to unblock the blocking call
if (!inClose && mayAttemptToFailover)
{
- Packet createRequest = new CreateSessionMessage(name,
- channel.getID(),
- version,
- username,
- password,
- minLargeMessageSize,
- xa,
- autoCommitSends,
- autoCommitAcks,
- preAcknowledge,
- confirmationWindowSize,
- defaultAddress == null ? null
- : defaultAddress.toString());
- boolean retry = false;
- do
- {
- try
- {
- channel1.sendBlocking(createRequest, PacketImpl.CREATESESSION_RESP);
- retry = false;
- }
- catch (HornetQException e)
- {
- // the session was created while its server was starting, retry it:
- if (e.getType() == HornetQExceptionType.SESSION_CREATION_REJECTED)
- {
- HornetQClientLogger.LOGGER.retryCreateSessionSeverStarting(name);
- retry = true;
- // sleep a little bit to avoid spinning too much
- Thread.sleep(10);
- }
- else
- {
- throw e;
- }
- }
- }
- while (retry && !inClose);
+ sessionContext.recreateSession(username, password,
+ minLargeMessageSize, xa, autoCommitSends,
+ autoCommitAcks, preAcknowledge, defaultAddress);
- channel.clearCommands();
-
- for (Map.Entry<Long, ClientConsumerInternal> entry : consumers.entrySet())
+ for (Map.Entry<Long, ClientConsumerInternal> entryx : consumers.entrySet())
{
- SessionQueueQueryResponseMessage queueInfo = entry.getValue().getQueueInfo();
-
- // We try and recreate any non durable queues, since they probably won't be there unless
- // they are defined in hornetq-configuration.xml
- // This allows e.g. JMS non durable subs and temporary queues to continue to be used after failover
- if (!queueInfo.isDurable())
- {
- CreateQueueMessage createQueueRequest = new CreateQueueMessage(queueInfo.getAddress(),
- queueInfo.getName(),
- queueInfo.getFilterString(),
- false,
- queueInfo.isTemporary(),
- false);
-
- sendPacketWithoutLock(createQueueRequest);
- }
-
- SessionCreateConsumerMessage createConsumerRequest = new SessionCreateConsumerMessage(entry.getKey(),
- entry.getValue()
- .getQueueName(),
- entry.getValue()
- .getFilterString(),
- entry.getValue()
- .isBrowseOnly(),
- false);
-
- sendPacketWithoutLock(createConsumerRequest);
- int clientWindowSize = entry.getValue().getClientWindowSize();
+ ClientConsumerInternal consumerInternal = entryx.getValue();
- if (clientWindowSize != 0)
- {
- SessionConsumerFlowCreditMessage packet = new SessionConsumerFlowCreditMessage(entry.getKey(),
- clientWindowSize);
-
- sendPacketWithoutLock(packet);
- }
- else
- {
- // https://jira.jboss.org/browse/HORNETQ-522
- SessionConsumerFlowCreditMessage packet = new SessionConsumerFlowCreditMessage(entry.getKey(),
- 1);
- sendPacketWithoutLock(packet);
- }
+ sessionContext.recreateConsumerOnServer(consumerInternal);
}
if ((!autoCommitAcks || !autoCommitSends) && workDone)
@@ -1227,7 +1024,7 @@ public void handleFailover(final CoreRemotingConnection backupConnection)
}
if (currentXID != null)
{
- sendPacketWithoutLock(new SessionXAAfterFailedMessage(currentXID));
+ sessionContext.xaFailed(currentXID);
rollbackOnly = true;
}
@@ -1240,21 +1037,13 @@ public void handleFailover(final CoreRemotingConnection backupConnection)
consumer.start();
}
- Packet packet = new PacketImpl(PacketImpl.SESS_START);
-
- packet.setChannelID(channel.getID());
-
- Connection conn = channel.getConnection().getTransportConnection();
-
- HornetQBuffer buffer = packet.encode(channel.getConnection());
-
- conn.write(buffer, false, false);
+ sessionContext.restartSession();
}
resetCreditManager = true;
}
- channel.returnBlocking();
+ sessionContext.returnBlocking();
}
}
catch (Throwable t)
@@ -1263,8 +1052,7 @@ public void handleFailover(final CoreRemotingConnection backupConnection)
}
finally
{
- channel.setTransferring(false);
- channel.unlock();
+ sessionContext.releaseCommunications();
}
if (resetCreditManager)
@@ -1283,11 +1071,9 @@ public void handleFailover(final CoreRemotingConnection backupConnection)
metaDataToSend = new HashMap<String, String>(metadata);
}
- // Resetting the metadata after failover
- for (Map.Entry<String, String> entries : metaDataToSend.entrySet())
- {
- sendPacketWithoutLock(new SessionAddMetaDataMessageV2(entries.getKey(), entries.getValue(), false));
- }
+ sessionContext.resetMetadata(metaDataToSend);
+
+
}
public void addMetaData(String key, String data) throws HornetQException
@@ -1297,15 +1083,15 @@ public void addMetaData(String key, String data) throws HornetQException
metadata.put(key, data);
}
- channel.sendBlocking(new SessionAddMetaDataMessageV2(key, data), PacketImpl.NULL_RESPONSE);
+ sessionContext.addSessionMetadata(key, data);
}
public void addUniqueMetaData(String key, String data) throws HornetQException
{
- channel.sendBlocking(new SessionUniqueAddMetaDataMessage(key, data), PacketImpl.NULL_RESPONSE);
+ sessionContext.addUniqueMetaData(key, data);
}
- public ClientSessionFactoryInternal getSessionFactory()
+ public ClientSessionFactory getSessionFactory()
{
return sessionFactory;
}
@@ -1339,30 +1125,14 @@ public void setPacketSize(final int packetSize)
}
}
- private void sendPacketWithoutLock(final Packet packet)
- {
- packet.setChannelID(channel.getID());
-
- Connection conn = channel.getConnection().getTransportConnection();
-
- HornetQBuffer buffer = packet.encode(channel.getConnection());
-
- conn.write(buffer, false, false);
- }
-
public void workDone()
{
workDone = true;
}
- public void returnBlocking()
- {
- channel.returnBlocking();
- }
-
public void sendProducerCreditsMessage(final int credits, final SimpleString address)
{
- channel.send(new SessionRequestProducerCreditsMessage(credits, address));
+ sessionContext.sendProducerCreditsMessage(credits, address);
}
public synchronized ClientProducerCredits getCredits(final SimpleString address, final boolean anon)
@@ -1405,34 +1175,7 @@ public void endCall()
// CommandConfirmationHandler implementation ------------------------------------
- public void commandConfirmed(final Packet packet)
- {
- if (packet.getType() == PacketImpl.SESS_SEND)
- {
- SessionSendMessage ssm = (SessionSendMessage) packet;
- callSendAck(ssm.getHandler(), ssm.getMessage());
- }
- else if (packet.getType() == PacketImpl.SESS_SEND_CONTINUATION)
- {
- SessionSendContinuationMessage scm = (SessionSendContinuationMessage) packet;
- if (!scm.isContinues())
- {
- callSendAck(scm.getHandler(), scm.getMessage());
- }
- }
- }
-
- private void callSendAck(SendAcknowledgementHandler handler, final Message message)
- {
- if (handler != null)
- {
- handler.sendAcknowledged(message);
- }
- else if (sendAckHandler != null)
- {
- sendAckHandler.sendAcknowledged(message);
- }
- }
+ // TODO: this will be encapsulated by the SessionContext
// XAResource implementation
// --------------------------------------------------------------------
@@ -1454,31 +1197,17 @@ public void commit(final Xid xid, final boolean onePhase) throws XAException
// Note - don't need to flush acks since the previous end would have
// done this
- SessionXACommitMessage packet = new SessionXACommitMessage(xid, onePhase);
-
startCall();
try
{
- SessionXAResponseMessage response = (SessionXAResponseMessage) channel.sendBlocking(packet, PacketImpl.SESS_XA_RESP);
+ sessionContext.xaCommit(xid, onePhase);
workDone = false;
-
- if (response.isError())
- {
- throw new XAException(response.getResponseCode());
- }
-
- if (HornetQClientLogger.LOGGER.isTraceEnabled())
- {
- HornetQClientLogger.LOGGER.trace("finished commit on " + convert(xid) + " with response = " + response);
- }
}
catch (HornetQException e)
{
HornetQClientLogger.LOGGER.failoverDuringCommit();
- // Unblocked on failover
- xaRetry = true;
// Any error on commit -> RETRY
// We can't rollback a Prepared TX for definition
throw new XAException(XAException.XA_RETRY);
@@ -1515,42 +1244,17 @@ public void end(final Xid xid, final int flags) throws XAException
try
{
- Packet packet;
-
- if (flags == XAResource.TMSUSPEND)
- {
- packet = new PacketImpl(PacketImpl.SESS_XA_SUSPEND);
- }
- else if (flags == XAResource.TMSUCCESS)
- {
- packet = new SessionXAEndMessage(xid, false);
- }
- else if (flags == XAResource.TMFAIL)
- {
- packet = new SessionXAEndMessage(xid, true);
- }
- else
- {
- throw new XAException(XAException.XAER_INVAL);
- }
-
flushAcks();
- SessionXAResponseMessage response;
startCall();
try
{
- response = (SessionXAResponseMessage) channel.sendBlocking(packet, PacketImpl.SESS_XA_RESP);
+ sessionContext.xaEnd(xid, flags);
}
finally
{
endCall();
}
-
- if (response.isError())
- {
- throw new XAException(response.getResponseCode());
- }
}
catch (HornetQException e)
{
@@ -1571,12 +1275,7 @@ public void forget(final Xid xid) throws XAException
startCall();
try
{
- SessionXAResponseMessage response = (SessionXAResponseMessage) channel.sendBlocking(new SessionXAForgetMessage(xid), PacketImpl.SESS_XA_RESP);
-
- if (response.isError())
- {
- throw new XAException(response.getResponseCode());
- }
+ sessionContext.xaForget(xid);
}
catch (HornetQException e)
{
@@ -1595,9 +1294,22 @@ public int getTransactionTimeout() throws XAException
try
{
- SessionXAGetTimeoutResponseMessage response = (SessionXAGetTimeoutResponseMessage) channel.sendBlocking(new PacketImpl(PacketImpl.SESS_XA_GET_TIMEOUT), PacketImpl.SESS_XA_GET_TIMEOUT_RESP);
+ return sessionContext.recoverSessionTimeout();
+ }
+ catch (HornetQException e)
+ {
+ // This should never occur
+ throw new XAException(XAException.XAER_RMERR);
+ }
+ }
- return response.getTimeoutSeconds();
+ public boolean setTransactionTimeout(final int seconds) throws XAException
+ {
+ checkXA();
+
+ try
+ {
+ return sessionContext.configureTransactionTimeout(seconds);
}
catch (HornetQException e)
{
@@ -1606,6 +1318,7 @@ public int getTransactionTimeout() throws XAException
}
}
+
public boolean isSameRM(final XAResource xares) throws XAException
{
checkXA();
@@ -1642,22 +1355,10 @@ public int prepare(final Xid xid) throws XAException
// Note - don't need to flush acks since the previous end would have
// done this
- SessionXAPrepareMessage packet = new SessionXAPrepareMessage(xid);
-
startCall();
try
{
- SessionXAResponseMessage response = (SessionXAResponseMessage) channel.sendBlocking(packet, PacketImpl.SESS_XA_RESP);
-
- if (response.isError())
- {
- throw new XAException(response.getResponseCode());
- }
- else
- {
- xaRetry = false;
- return response.getResponseCode();
- }
+ return sessionContext.xaPrepare(xid);
}
catch (HornetQException e)
{
@@ -1666,16 +1367,8 @@ public int prepare(final Xid xid) throws XAException
// Unblocked on failover
try
{
- HornetQClientLogger.LOGGER.failoverDuringPrepare();
- SessionXAResponseMessage response = (SessionXAResponseMessage) channel.sendBlocking(packet, PacketImpl.SESS_XA_RESP);
-
- if (response.isError())
- {
- throw new XAException(response.getResponseCode());
- }
-
- xaRetry = false;
- return response.getResponseCode();
+ // will retry once after failover & unblock
+ return sessionContext.xaPrepare(xid);
}
catch (HornetQException e1)
{
@@ -1716,13 +1409,7 @@ public int prepare(final Xid xid) throws XAException
{
try
{
- SessionXAGetInDoubtXidsResponseMessage response = (SessionXAGetInDoubtXidsResponseMessage) channel.sendBlocking(new PacketImpl(PacketImpl.SESS_XA_INDOUBT_XIDS), PacketImpl.SESS_XA_INDOUBT_XIDS_RESP);
-
- List<Xid> xids = response.getXids();
-
- Xid[] xidArray = xids.toArray(new Xid[xids.size()]);
-
- return xidArray;
+ return sessionContext.xaScan();
}
catch (HornetQException e)
{
@@ -1760,28 +1447,25 @@ public void rollback(final Xid xid) throws XAException
flushAcks();
- SessionXARollbackMessage packet = new SessionXARollbackMessage(xid);
-
- SessionXAResponseMessage response = (SessionXAResponseMessage) channel.sendBlocking(packet, PacketImpl.SESS_XA_RESP);
-
- if (wasStarted)
+ try
{
- start();
+ sessionContext.xaRollback(xid, wasStarted);
}
-
- workDone = false;
-
- if (response.isError())
+ finally
{
- throw new XAException(response.getResponseCode());
+ if (wasStarted)
+ {
+ start();
+ }
}
+
+ workDone = false;
}
catch (HornetQException e)
{
if (e.getType() == HornetQExceptionType.UNBLOCKED)
{
// Unblocked on failover
- xaRetry = true;
throw new XAException(XAException.XA_RETRY);
}
// This should never occur
@@ -1789,23 +1473,6 @@ public void rollback(final Xid xid) throws XAException
}
}
- public boolean setTransactionTimeout(final int seconds) throws XAException
- {
- checkXA();
-
- try
- {
- SessionXASetTimeoutResponseMessage response = (SessionXASetTimeoutResponseMessage) channel.sendBlocking(new SessionXASetTimeoutMessage(seconds), PacketImpl.SESS_XA_SET_TIMEOUT_RESP);
-
- return response.isOK();
- }
- catch (HornetQException e)
- {
- // This should never occur
- throw new XAException(XAException.XAER_RMERR);
- }
- }
-
public void start(final Xid xid, final int flags) throws XAException
{
if (HornetQClientLogger.LOGGER.isTraceEnabled())
@@ -1815,37 +1482,12 @@ public void start(final Xid xid, final int flags) throws XAException
checkXA();
- Packet packet = null;
-
try
{
- if (flags == XAResource.TMJOIN)
- {
- packet = new SessionXAJoinMessage(xid);
- }
- else if (flags == XAResource.TMRESUME)
- {
- packet = new SessionXAResumeMessage(xid);
- }
- else if (flags == XAResource.TMNOFLAGS)
- {
- // Don't need to flush since the previous end will have done this
- packet = new SessionXAStartMessage(xid);
- }
- else
- {
- throw new XAException(XAException.XAER_INVAL);
- }
- SessionXAResponseMessage response = (SessionXAResponseMessage) channel.sendBlocking(packet, PacketImpl.SESS_XA_RESP);
+ sessionContext.xaStart(xid, flags);
this.currentXID = xid;
-
- if (response.isError())
- {
- HornetQClientLogger.LOGGER.errorCallingStart(response.getMessage(), response.getResponseCode());
- throw new XAException(response.getResponseCode());
- }
}
catch (HornetQException e)
{
@@ -1854,13 +1496,7 @@ else if (flags == XAResource.TMNOFLAGS)
{
try
{
- SessionXAResponseMessage response = (SessionXAResponseMessage) channel.sendBlocking(packet, PacketImpl.SESS_XA_RESP);
-
- if (response.isError())
- {
- HornetQClientLogger.LOGGER.errorCallingStart(response.getMessage(), response.getResponseCode());
- throw new XAException(response.getResponseCode());
- }
+ sessionContext.xaStart(xid, flags);
}
catch (HornetQException e1)
{
@@ -1897,7 +1533,7 @@ public void setForceNotSameRM(final boolean force)
public RemotingConnection getConnection()
{
- return remotingConnection;
+ return sessionContext.getRemotingConnection();
}
@Override
@@ -1924,38 +1560,6 @@ public String toString()
Integer.toHexString(hashCode());
}