Skip to content

Commit

Permalink
HORNETQ-927 - cleared up factoires in resource adapter
Browse files Browse the repository at this point in the history
  • Loading branch information
andytaylor committed May 15, 2012
1 parent 60c2481 commit 198f030
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 4 deletions.
11 changes: 7 additions & 4 deletions src/main/org/hornetq/ra/inflow/HornetQActivation.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.jms.HornetQJMSClient;
import org.hornetq.core.client.impl.ClientSessionInternal;
import org.hornetq.core.logging.Logger;
Expand Down Expand Up @@ -310,8 +311,9 @@ protected synchronized void setup() throws Exception

try
{
session = setupSession();
HornetQMessageHandler handler = new HornetQMessageHandler(this, ra.getTM(), (ClientSessionInternal) session, i);
ClientSessionFactory cf = factory.getServerLocator().createSessionFactory();
session = setupSession(cf);
HornetQMessageHandler handler = new HornetQMessageHandler(this, ra.getTM(), (ClientSessionInternal) session, cf, i);
handler.setup();
session.start();
handlers.add(handler);
Expand Down Expand Up @@ -375,14 +377,15 @@ protected void setupCF() throws Exception
* Setup a session
* @return The connection
* @throws Exception Thrown if an error occurs
* @param cf
*/
protected ClientSession setupSession() throws Exception
protected ClientSession setupSession(ClientSessionFactory cf) throws Exception
{
ClientSession result = null;

try
{
result = ra.createSession(factory.getServerLocator().createSessionFactory(),
result = ra.createSession(cf,
spec.getAcknowledgeModeInt(),
spec.getUser(),
spec.getPassword(),
Expand Down
17 changes: 17 additions & 0 deletions src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.hornetq.api.core.client.ClientConsumer;
import org.hornetq.api.core.client.ClientMessage;
import org.hornetq.api.core.client.ClientSession.QueueQuery;
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.MessageHandler;
import org.hornetq.core.client.impl.ClientConsumerInternal;
import org.hornetq.core.client.impl.ClientSessionInternal;
Expand Down Expand Up @@ -78,13 +79,17 @@ public class HornetQMessageHandler implements MessageHandler

private final TransactionManager tm;

private ClientSessionFactory cf;

public HornetQMessageHandler(final HornetQActivation activation,
final TransactionManager tm,
final ClientSessionInternal session,
final ClientSessionFactory cf,
final int sessionNr)
{
this.activation = activation;
this.session = session;
this.cf = cf;
this.sessionNr = sessionNr;
this.tm = tm;
}
Expand Down Expand Up @@ -262,6 +267,18 @@ public void teardown()
{
HornetQMessageHandler.log.debug("Error releasing session " + session, t);
}

try
{
if (cf != null)
{
cf.close();
}
}
catch (Throwable t)
{
HornetQMessageHandler.log.debug("Error releasing session factory " + session, t);
}
}

public void onMessage(final ClientMessage message)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,30 @@
*/
package org.hornetq.tests.integration.ra;

import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import javax.jms.Connection;

import javax.resource.ResourceException;
import javax.resource.spi.endpoint.MessageEndpoint;

import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.api.jms.HornetQJMSClient;
import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
import org.hornetq.core.client.impl.ServerLocatorImpl;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.jms.client.HornetQConnectionFactory;
import org.hornetq.jms.client.HornetQDestination;
import org.hornetq.ra.HornetQResourceAdapter;
import org.hornetq.ra.inflow.HornetQActivation;
import org.hornetq.ra.inflow.HornetQActivationSpec;
import org.hornetq.tests.unit.ra.MessageEndpointFactory;
import org.hornetq.tests.util.UnitTestCase;
import org.hornetq.utils.DefaultSensitiveStringCodec;

Expand All @@ -32,6 +45,84 @@
*/
public class ResourceAdapterTest extends HornetQRATestBase
{
public void testStartStopActivationManyTimes() throws Exception
{
HornetQServer server = createServer(false);

try
{

server.start();
ServerLocator locator = createInVMNonHALocator();
ClientSessionFactory factory = locator.createSessionFactory();
ClientSession session = factory.createSession(false, false, false);
HornetQDestination queue = (HornetQDestination) HornetQJMSClient.createQueue("test");
session.createQueue(queue.getSimpleAddress(), queue.getSimpleAddress(), true);
session.close();

HornetQResourceAdapter ra = new HornetQResourceAdapter();

ra.setConnectorClassName("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory");
ra.setUserName("userGlobal");
ra.setPassword("passwordGlobal");
ra.setTransactionManagerLocatorClass("");
ra.setTransactionManagerLocatorMethod("");
ra.start(new org.hornetq.tests.unit.ra.BootstrapContext());

Connection conn = ra.getDefaultHornetQConnectionFactory().createConnection();

conn.close();

HornetQActivationSpec spec = new HornetQActivationSpec();

spec.setResourceAdapter(ra);

spec.setUseJNDI(false);

spec.setUser("user");
spec.setPassword("password");

spec.setDestinationType("Topic");
spec.setDestination("test");

spec.setMinSession(1);
spec.setMaxSession(15);

HornetQActivation activation = new HornetQActivation(ra, new MessageEndpointFactory(), spec);

ServerLocatorImpl serverLocator = (ServerLocatorImpl) ra.getDefaultHornetQConnectionFactory().getServerLocator();

Field f = Class.forName(ServerLocatorImpl.class.getName()).getDeclaredField("factories");

f.setAccessible(true);


Set<ClientSessionFactoryInternal> factories = (Set<ClientSessionFactoryInternal>) f.get(serverLocator);

for (int i = 0; i < 10 ; i++)
{
System.out.println(i);
assertEquals(factories.size(), 0);
activation.start();
assertEquals(factories.size(), 15);
activation.stop();
assertEquals(factories.size(), 0);
}


System.out.println("before RA stop => " + factories.size());
ra.stop();
System.out.println("after RA stop => " + factories.size());
assertEquals(factories.size(), 0);
locator.close();

}
finally
{
server.stop();
}
}

public void testStartStop() throws Exception
{
HornetQResourceAdapter qResourceAdapter = new HornetQResourceAdapter();
Expand Down

0 comments on commit 198f030

Please sign in to comment.