Skip to content

Commit

Permalink
https://issues.jboss.org/browse/HORNETQ-705 - fixing queue iterations…
Browse files Browse the repository at this point in the history
… with multiple priorities
  • Loading branch information
clebertsuconic committed May 28, 2011
1 parent 4ea02be commit b00f2e5
Show file tree
Hide file tree
Showing 10 changed files with 514 additions and 126 deletions.
2 changes: 0 additions & 2 deletions src/main/org/hornetq/core/server/cluster/Bridge.java
Expand Up @@ -44,8 +44,6 @@ public interface Bridge extends Consumer, HornetQComponent


void activate(); void activate();


void setQueue(Queue queue);

void setNotificationService(NotificationService notificationService); void setNotificationService(NotificationService notificationService);


RemotingConnection getForwardingConnection(); RemotingConnection getForwardingConnection();
Expand Down
30 changes: 21 additions & 9 deletions src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
Expand Up @@ -64,6 +64,8 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
// Constants ----------------------------------------------------- // Constants -----------------------------------------------------


private static final Logger log = Logger.getLogger(BridgeImpl.class); private static final Logger log = Logger.getLogger(BridgeImpl.class);

private static final boolean isTrace = log.isTraceEnabled();


// Attributes ---------------------------------------------------- // Attributes ----------------------------------------------------


Expand All @@ -77,7 +79,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled


private final SimpleString name; private final SimpleString name;


private Queue queue; private final Queue queue;


protected final Executor executor; protected final Executor executor;


Expand Down Expand Up @@ -222,6 +224,8 @@ public void stop() throws Exception
} }
} }


log.info("Bridge " + this.name + " being stopped");

stopping = true; stopping = true;


executor.execute(new StopRunnable()); executor.execute(new StopRunnable());
Expand Down Expand Up @@ -266,11 +270,6 @@ public Queue getQueue()
return queue; return queue;
} }


public void setQueue(final Queue queue)
{
this.queue = queue;
}

public Filter getFilter() public Filter getFilter()
{ {
return filter; return filter;
Expand Down Expand Up @@ -367,20 +366,25 @@ public HandleStatus handle(final MessageReference ref) throws Exception
{ {
return HandleStatus.NO_MATCH; return HandleStatus.NO_MATCH;
} }

synchronized (this) synchronized (this)
{ {
if (!active) if (!active)
{ {
log.debug(name + "::Ignoring reference on bridge as it is set to iniactive ref=" + ref);
return HandleStatus.BUSY; return HandleStatus.BUSY;
} }


if (isTrace)
{
log.trace("Bridge " + name + " is handling reference=" + ref);
}
ref.handled(); ref.handled();


ServerMessage message = ref.getMessage(); ServerMessage message = ref.getMessage();


refs.add(ref); refs.add(ref);

message = beforeForward(message); message = beforeForward(message);


SimpleString dest; SimpleString dest;
Expand Down Expand Up @@ -419,11 +423,13 @@ public HandleStatus handle(final MessageReference ref) throws Exception


public void connectionFailed(final HornetQException me, boolean failedOver) public void connectionFailed(final HornetQException me, boolean failedOver)
{ {
log.warn(name + "::Connection failed with failedOver=" + failedOver, me);
fail(false); fail(false);
} }


public void beforeReconnect(final HornetQException exception) public void beforeReconnect(final HornetQException exception)
{ {
log.warn(name + "::Connection failed before reconnect ", exception);
fail(true); fail(true);
} }


Expand Down Expand Up @@ -454,8 +460,11 @@ private void fail(final boolean beforeReconnect)
// we want to cancel all unacked refs so they get resent // we want to cancel all unacked refs so they get resent
// duplicate detection will ensure no dups are routed on the other side // duplicate detection will ensure no dups are routed on the other side


log.debug(name + "::BridgeImpl::fail being called, beforeReconnect=" + beforeReconnect);

if (session.getConnection().isDestroyed()) if (session.getConnection().isDestroyed())
{ {
log.debug(name + "::Connection is destroyed, active = false now");
active = false; active = false;
} }


Expand All @@ -467,7 +476,7 @@ private void fail(final boolean beforeReconnect)
{ {
synchronized (this) synchronized (this)
{ {
active = false; log.debug(name + "::Connection is destroyed, active = false now");
} }


cancelRefs(); cancelRefs();
Expand All @@ -476,6 +485,7 @@ private void fail(final boolean beforeReconnect)
{ {
afterConnect(); afterConnect();


log.debug(name + "::After reconnect, setting active=true now");
active = true; active = true;


if (queue != null) if (queue != null)
Expand Down Expand Up @@ -650,6 +660,8 @@ public void run()
{ {
return; return;
} }

log.debug("Closing Session for bridge " + BridgeImpl.this.name);


if (session != null) if (session != null)
{ {
Expand Down
Expand Up @@ -16,8 +16,12 @@
import static org.hornetq.api.core.management.NotificationType.CONSUMER_CLOSED; import static org.hornetq.api.core.management.NotificationType.CONSUMER_CLOSED;
import static org.hornetq.api.core.management.NotificationType.CONSUMER_CREATED; import static org.hornetq.api.core.management.NotificationType.CONSUMER_CREATED;


import java.util.*; import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;


import org.hornetq.api.core.DiscoveryGroupConfiguration; import org.hornetq.api.core.DiscoveryGroupConfiguration;
Expand All @@ -29,7 +33,6 @@
import org.hornetq.api.core.management.ManagementHelper; import org.hornetq.api.core.management.ManagementHelper;
import org.hornetq.api.core.management.NotificationType; import org.hornetq.api.core.management.NotificationType;
import org.hornetq.core.client.impl.ServerLocatorInternal; import org.hornetq.core.client.impl.ServerLocatorInternal;
import org.hornetq.core.client.impl.TopologyMember;
import org.hornetq.core.logging.Logger; import org.hornetq.core.logging.Logger;
import org.hornetq.core.postoffice.Binding; import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.Bindings; import org.hornetq.core.postoffice.Bindings;
Expand Down Expand Up @@ -272,6 +275,8 @@ public void stop() throws Exception
{ {
serverLocator.removeClusterTopologyListener(this); serverLocator.removeClusterTopologyListener(this);
} }

log.debug("Cluster connection being stopped for node" + nodeUUID);


synchronized (this) synchronized (this)
{ {
Expand Down Expand Up @@ -357,6 +362,8 @@ public synchronized void activate() throws Exception
serverLocator.setBackup(server.getConfiguration().isBackup()); serverLocator.setBackup(server.getConfiguration().isBackup());
serverLocator.setInitialConnectAttempts(-1); serverLocator.setInitialConnectAttempts(-1);
serverLocator.setConfirmationWindowSize(0); serverLocator.setConfirmationWindowSize(0);
serverLocator.setBlockOnDurableSend(false);
serverLocator.setBlockOnNonDurableSend(false);


if(retryInterval > 0) if(retryInterval > 0)
{ {
Expand Down Expand Up @@ -388,6 +395,7 @@ public TransportConfiguration getConnector()


public synchronized void nodeDown(final String nodeID) public synchronized void nodeDown(final String nodeID)
{ {
log.debug("node " + nodeID + " being considered down on cluster connection for nodeID=" + nodeUUID);
if (nodeID.equals(nodeUUID.toString())) if (nodeID.equals(nodeUUID.toString()))
{ {
return; return;
Expand Down

0 comments on commit b00f2e5

Please sign in to comment.