Permalink
Browse files

Removing cached sessions on our Resource adapter

  • Loading branch information...
1 parent 528c947 commit d6c3d518a5181e3ea1d33b0f9b5dd493c1f4b0ae @clebertsuconic clebertsuconic committed Jun 15, 2010
View
2 src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
@@ -519,7 +519,7 @@ public void acknowledge(final boolean autoCommitAcks, final Transaction tx, fina
closed);
}
- if (autoCommitAcks)
+ if (autoCommitAcks || tx == null)
{
ref.getQueue().acknowledge(ref);
}
View
10 src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
@@ -510,11 +510,6 @@ public void acknowledge(final long consumerID, final long messageID) throws Exce
{
ServerConsumer consumer = consumers.get(consumerID);
- if (this.xa && tx == null)
- {
- throw new HornetQXAException(XAException.XAER_PROTO, "Invalid transaction state");
- }
-
consumer.acknowledge(autoCommitAcks, tx, messageID);
}
@@ -1169,11 +1164,6 @@ private void doSend(final ServerMessage msg, final boolean direct) throws Except
throw e;
}
- if (this.xa && tx == null)
- {
- throw new HornetQXAException(XAException.XAER_PROTO, "Invalid transaction state");
- }
-
if (tx == null || autoCommitSends)
{
}
View
95 src/main/org/hornetq/ra/HornetQRAManagedConnection.java
@@ -26,10 +26,8 @@
import javax.jms.Connection;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
-import javax.jms.QueueConnection;
import javax.jms.ResourceAllocationException;
import javax.jms.Session;
-import javax.jms.TopicConnection;
import javax.jms.XAConnection;
import javax.jms.XAQueueConnection;
import javax.jms.XASession;
@@ -44,7 +42,9 @@
import javax.resource.spi.ManagedConnectionMetaData;
import javax.resource.spi.SecurityException;
import javax.security.auth.Subject;
+import javax.transaction.Status;
import javax.transaction.SystemException;
+import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
import javax.transaction.xa.XAResource;
@@ -95,12 +95,10 @@
// auto-commit session, used outside XA or Local transaction
private Session session;
- private Session transactedSession;
-
private XASession xaSession;
private XAResource xaResource;
-
+
private final TransactionManager tm;
private boolean inManagedTx;
@@ -133,7 +131,6 @@ public HornetQRAManagedConnection(final HornetQRAManagedConnectionFactory mcf,
connection = null;
session = null;
- transactedSession = null;
xaSession = null;
xaResource = null;
@@ -234,7 +231,7 @@ public void destroy() throws ResourceException
HornetQRAManagedConnection.log.trace("destroy()");
}
- if (isDestroyed.get() || connection == null)
+ if (isDestroyed.get() || connection == null)
{
return;
}
@@ -261,11 +258,6 @@ public void destroy() throws ResourceException
session.close();
}
- if (transactedSession != null)
- {
- transactedSession.close();
- }
-
if (xaSession != null)
{
xaSession.close();
@@ -306,7 +298,7 @@ public void cleanup() throws ResourceException
destroyHandles();
inManagedTx = false;
-
+
// I'm recreating the lock object when we return to the pool
// because it looks too nasty to expect the connection handle
// to unlock properly in certain race conditions
@@ -339,6 +331,35 @@ public void associateConnection(final Object obj) throws ResourceException
}
}
+ public void checkTransactionActive() throws JMSException
+ {
+ // don't bother looking at the transaction if there's an active XID
+ if (!inManagedTx && tm != null)
+ {
+ try
+ {
+ Transaction tx = tm.getTransaction();
+ if (tx != null)
+ {
+ int status = tx.getStatus();
+ // Only allow states that will actually succeed
+ if (status != Status.STATUS_ACTIVE && status != Status.STATUS_PREPARING &&
+ status != Status.STATUS_PREPARED &&
+ status != Status.STATUS_COMMITTING)
+ {
+ throw new javax.jms.IllegalStateException("Transaction " + tx + " not active");
+ }
+ }
+ }
+ catch (SystemException e)
+ {
+ JMSException jmsE = new javax.jms.IllegalStateException("Unexpected exception on the Transaction ManagerTransaction");
+ jmsE.initCause(e);
+ throw jmsE;
+ }
+ }
+ }
+
/**
* Aqquire a lock on the managed connection
*/
@@ -441,7 +462,7 @@ public XAResource getXAResource() throws ResourceException
//
if (xaResource == null)
{
- xaResource = xaSession.getXAResource();
+ xaResource = xaSession.getXAResource();
}
if (HornetQRAManagedConnection.trace)
@@ -566,36 +587,7 @@ public void onException(final JMSException exception)
*/
protected Session getSession() throws JMSException
{
- if (xaResource != null && isManagedTx())
- {
- if (HornetQRAManagedConnection.trace)
- {
- HornetQRAManagedConnection.log.trace("getSession() -> XA session " + xaSession.getSession());
- }
-
- return xaSession.getSession();
- }
- else
- {
- if (isManagedTx())
- {
- if (HornetQRAManagedConnection.trace)
- {
- HornetQRAManagedConnection.log.trace("getSession() -> transactedSession " + transactedSession);
- }
-
- return transactedSession;
- }
- else
- {
- if (HornetQRAManagedConnection.trace)
- {
- HornetQRAManagedConnection.log.trace("getSession() -> session " + session);
- }
-
- return session;
- }
- }
+ return session;
}
/**
@@ -748,8 +740,8 @@ private void setup() throws ResourceException
try
{
boolean transacted = cri.isTransacted();
- int acknowledgeMode = Session.AUTO_ACKNOWLEDGE;
-
+ int acknowledgeMode = Session.AUTO_ACKNOWLEDGE;
+
if (cri.getType() == HornetQRAConnectionFactory.TOPIC_CONNECTION)
{
if (userName != null && password != null)
@@ -764,8 +756,7 @@ private void setup() throws ResourceException
connection.setExceptionListener(this);
xaSession = ((XATopicConnection)connection).createXATopicSession();
- transactedSession = ((TopicConnection)connection).createTopicSession(transacted, acknowledgeMode);
- session = ((TopicConnection)connection).createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+ session = xaSession.getSession();
}
else if (cri.getType() == HornetQRAConnectionFactory.QUEUE_CONNECTION)
{
@@ -781,8 +772,7 @@ else if (cri.getType() == HornetQRAConnectionFactory.QUEUE_CONNECTION)
connection.setExceptionListener(this);
xaSession = ((XAQueueConnection)connection).createXAQueueSession();
- transactedSession = ((QueueConnection)connection).createQueueSession(transacted, acknowledgeMode);
- session = ((QueueConnection)connection).createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+ session = xaSession.getSession();
}
else
{
@@ -798,16 +788,15 @@ else if (cri.getType() == HornetQRAConnectionFactory.QUEUE_CONNECTION)
connection.setExceptionListener(this);
xaSession = ((XAConnection)connection).createXASession();
- transactedSession = connection.createSession(transacted, acknowledgeMode);
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ session = xaSession.getSession();
}
}
catch (JMSException je)
{
throw new ResourceException(je.getMessage(), je);
}
}
-
+
private boolean isManagedTx()
{
return inManagedTx || isXA();
View
1 src/main/org/hornetq/ra/HornetQRAMessageConsumer.java
@@ -96,6 +96,7 @@ void checkState() throws JMSException
{
HornetQRAMessageConsumer.log.trace("checkState()");
}
+ session.checkState();
}
/**
View
1 src/main/org/hornetq/ra/HornetQRAMessageProducer.java
@@ -402,6 +402,7 @@ public void setTimeToLive(final long timeToLive) throws JMSException
*/
void checkState() throws JMSException
{
+ session.checkState();
}
/**
View
12 src/main/org/hornetq/ra/HornetQRASession.java
@@ -49,6 +49,8 @@
import javax.jms.XATopicSession;
import javax.resource.ResourceException;
import javax.resource.spi.ConnectionEvent;
+import javax.transaction.RollbackException;
+import javax.transaction.SystemException;
import javax.transaction.xa.XAResource;
import org.hornetq.core.logging.Logger;
@@ -1612,4 +1614,14 @@ TopicSession getTopicSessionInternal() throws JMSException
}
return (TopicSession)s;
}
+
+ /**
+ * @throws SystemException
+ * @throws RollbackException
+ *
+ */
+ public void checkState() throws JMSException
+ {
+ mc.checkTransactionActive();
+ }
}
View
8 src/main/org/hornetq/ra/Util.java
@@ -197,7 +197,13 @@ public static Object lookup(final Context context, final String name, final Clas
/** The Resource adapter can't depend on any provider's specific library. Because of that we use reflection to locate the
- * transaction manager during startup. */
+ * transaction manager during startup.
+ *
+ *
+ * TODO: https://jira.jboss.org/browse/HORNETQ-417
+ * We should use a proper SPI instead of reflection
+ * We would need to define a proper SPI package for this.
+ * */
public static TransactionManager locateTM(final String locatorClass, final String locatorMethod)
{
try
View
6 tests/src/org/hornetq/tests/integration/xa/BasicXaTest.java
@@ -138,7 +138,7 @@ public void testSendWithoutXID() throws Exception
ClientConsumer cons = session.createConsumer("Test");
- assertNull("Send went through an invalid XA Session", cons.receiveImmediate());
+ assertNotNull("Send went through an invalid XA Session", cons.receiveImmediate());
}
finally
{
@@ -194,9 +194,7 @@ public void testACKWithoutXID() throws Exception
msg = cons.receiveImmediate();
- assertNotNull("Acknowledge went through invalid XA Session", msg);
-
- assertNull(cons.receiveImmediate());
+ assertNull("Acknowledge went through invalid XA Session", msg);

0 comments on commit d6c3d51

Please sign in to comment.