Skip to content
Permalink
Browse files

JBPAPP-8366 & JBPAPP-8377 - fixing leaks and duplicated resource on TM

  • Loading branch information...
clebertsuconic committed Mar 9, 2012
1 parent c92a8dd commit be93b3180c234b4def4d70606cfc4ac3847fb0ef
@@ -112,6 +112,18 @@ public static ServerLocator createServerLocatorWithoutHA(TransportConfiguration.
return new ServerLocatorImpl(false, transportConfigurations);
}

/**
* Create a ServerLocator which creates session factories using a static list of transportConfigurations, the ServerLocator is not updated automatically
* as the cluster topology changes, and no HA backup information is propagated to the client
*
* @param transportConfigurations
* @return the ServerLocator
*/
public static ServerLocator createServerLocator(final boolean ha, TransportConfiguration... transportConfigurations)
{
return new ServerLocatorImpl(ha, transportConfigurations);
}

/**
* Create a ServerLocator which creates session factories from a set of live servers, no HA backup information is propagated to the client
*
@@ -126,6 +138,20 @@ public static ServerLocator createServerLocatorWithoutHA(final DiscoveryGroupCon
return new ServerLocatorImpl(false, groupConfiguration);
}

/**
* Create a ServerLocator which creates session factories from a set of live servers, no HA backup information is propagated to the client
*
* The UDP address and port are used to listen for live servers in the cluster
*
* @param discoveryAddress The UDP group address to listen for updates
* @param discoveryPort the UDP port to listen for updates
* @return the ServerLocator
*/
public static ServerLocator createServerLocator(final boolean ha, final DiscoveryGroupConfiguration groupConfiguration)
{
return new ServerLocatorImpl(ha, groupConfiguration);
}

/**
* Create a ServerLocator which will receive cluster topology updates from the cluster as servers leave or join and new backups are appointed or removed.
* The initial list of servers supplied in this method is simply to make an initial connection to the cluster, once that connection is made, up to date
@@ -0,0 +1,102 @@
/*
* Copyright 2010 Red Hat, Inc.
* Red Hat licenses this file to you under the Apache License, version
* 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package org.hornetq.jms.server.recovery;

import java.util.HashMap;

import org.hornetq.core.logging.Logger;
import org.jboss.tm.XAResourceRecoveryRegistry;

/**
* This class is a base class for the integration layer where
* we verify if a given connection factory already have a recovery registered
*
* @author Clebert
*
*
*/
public abstract class HornetQRegistryBase implements RecoveryRegistry
{
// Constants -----------------------------------------------------

private static final Logger log = Logger.getLogger(HornetQRegistryBase.class);

// Attributes ----------------------------------------------------

private static HashMap<XARecoveryConfig, HornetQResourceRecovery> configSet = new HashMap<XARecoveryConfig, HornetQResourceRecovery>();

// Static --------------------------------------------------------

// Constructors --------------------------------------------------

// Public --------------------------------------------------------

public abstract XAResourceRecoveryRegistry getTMRegistry();

public HornetQResourceRecovery register(final HornetQResourceRecovery resourceRecovery)
{
synchronized (configSet)
{
HornetQResourceRecovery recovery = configSet.get(resourceRecovery.getConfig());

if (recovery == null)
{
recovery = resourceRecovery;
if (log.isDebugEnabled())
{
log.debug("Registering a new recovery for " + recovery.getConfig() + ", recovery = " + resourceRecovery);
}
configSet.put(resourceRecovery.getConfig(), resourceRecovery);
getTMRegistry().addXAResourceRecovery(recovery);
}
else
{
if (log.isDebugEnabled())
{
log.debug("Return pre-existent recovery=" + recovery + " for configuration = " + resourceRecovery.getConfig());
}
}
recovery.incrementUsage();
return recovery;
}
}



public void unRegister(final HornetQResourceRecovery resourceRecovery)
{
synchronized (configSet)
{
HornetQResourceRecovery recFound = configSet.get(resourceRecovery.getConfig());

if (recFound != null && recFound.decrementUsage() == 0)
{
if (log.isDebugEnabled())
{
log.debug("Removing recovery information for " + recFound + " as all the deployments were already removed");
}
getTMRegistry().removeXAResourceRecovery(recFound);
configSet.remove(resourceRecovery);
}
}
}

// Package protected ---------------------------------------------

// Protected -----------------------------------------------------

// Private -------------------------------------------------------
// Inner classes -------------------------------------------------

}
@@ -32,28 +32,52 @@
public class HornetQResourceRecovery implements XAResourceRecovery
{
private final XARecoveryConfig config;
private XAResource[] xaResources;

private final XAResource[] xaResources;

private int usage;

public HornetQResourceRecovery(XARecoveryConfig config)
{
this.config = config;
xaResources = new XAResource[]{new HornetQXAResourceWrapper(config)};
this.xaResources = new XAResource[] { new HornetQXAResourceWrapper(config) };
}

public XAResource[] getXAResources()
{
return xaResources;
}

public XARecoveryConfig getConfig()
{
return config;
}

/** we may have several connection factories referencing the same connection recovery entry.
* Because of that we need to make a count of the number of the instances that are referencing it,
* so we will remove it as soon as we are done */
public synchronized int incrementUsage()
{
return ++usage;
}

public synchronized int decrementUsage()
{
return --usage;
}

@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;

HornetQResourceRecovery that = (HornetQResourceRecovery) o;
HornetQResourceRecovery that = (HornetQResourceRecovery)o;

if (config != null ? !config.equals(that.config) : that.config != null) return false;
if (config != null ? !config.equals(that.config) : that.config != null)
return false;

return true;
}
@@ -22,6 +22,7 @@
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.api.core.client.SessionFailureListener;
import org.hornetq.core.logging.Logger;
@@ -242,7 +243,7 @@ public void beforeReconnect(final HornetQException me)
* @return the connectionFactory
* @throws XAException for any problem
*/
public XAResource getDelegate(boolean retry) throws XAException
private XAResource getDelegate(boolean retry) throws XAException
{
XAResource result = null;
Exception error = null;
@@ -316,7 +317,14 @@ protected XAResource connect() throws Exception

try
{
serverLocator = xaRecoveryConfig.getHornetQConnectionFactory().getServerLocator();
if (xaRecoveryConfig.getDiscoveryConfiguration() != null)
{
serverLocator = HornetQClient.createServerLocator(xaRecoveryConfig.isHA(), xaRecoveryConfig.getDiscoveryConfiguration());
}
else
{
serverLocator = HornetQClient.createServerLocator(xaRecoveryConfig.isHA(), xaRecoveryConfig.getTransportConfig());
}
serverLocator.disableFinalizeCheck();
csf = serverLocator.createSessionFactory();
if (xaRecoveryConfig.getUsername() == null)
@@ -334,10 +342,29 @@ protected XAResource connect() throws Exception
1);
}
}
catch (HornetQException e)
catch (Throwable e)
{
log.warn("Can't connect to " + xaRecoveryConfig + " on auto-generated resource recovery", e);
if (log.isDebugEnabled())
{
log.debug(e.getMessage(), e);
}

try
{
if (cs != null) cs.close();
if (serverLocator != null) serverLocator.close();
}
catch (Throwable ignored)
{
if (log.isTraceEnabled())
{
log.trace(e.getMessage(), ignored);
}
}
continue;
}

cs.addFailureListener(this);

synchronized (HornetQXAResourceWrapper.lock)
@@ -392,9 +419,9 @@ public void close()
oldServerLocator.close();
}
}
catch (Exception ignored)
catch (Throwable ignored)
{
HornetQXAResourceWrapper.log.trace("Ignored error during close", ignored);
HornetQXAResourceWrapper.log.debug("Ignored error during close", ignored);
}
}

@@ -410,10 +437,9 @@ protected XAException check(final XAException e) throws XAException
{
log.warn(e.getMessage(), e);

if (e.errorCode == XAException.XA_RETRY)
{
close();
}

// If any exception happened, we close the connection so we may start fresh
close();
throw e;
}

@@ -28,7 +28,7 @@
*/
public interface RecoveryRegistry
{
void register(HornetQResourceRecovery resourceRecovery);
HornetQResourceRecovery register(HornetQResourceRecovery resourceRecovery);

void unRegister(HornetQResourceRecovery xaRecoveryConfig);
}
Oops, something went wrong.

0 comments on commit be93b31

Please sign in to comment.
You can’t perform that action at this time.