Skip to content

Commit

Permalink
cells: Allow local delivery of messages through queue routes
Browse files Browse the repository at this point in the history
Motivation:

The cells framework has two boolean flags to restrict local and remote delivery
of messages.  The current implementation would skip the use of routes if remote
delivery was disabled (with the exception of topic routes). This causes
messages to named queues to fail to be delivered to local consumers if remote
delivery is disabled. This is the case when the message was received from a
core domain.

Modification:

Restructure the routing loop such that only delivery through routes to domain
targets are suppressed. This is the same logic also used for topic routes.

Result:

Fixed an issue with message delivery to named queues affecting consumers
whose cell name is different from the queue name.

Target: trunk
Require-notes: yes
Require-book: no
Request: 2.16
Acked-by: Paul Millar <paul.millar@desy.de>

Reviewed at https://rb.dcache.org/r/9438/
  • Loading branch information
gbehrmann committed Jun 20, 2016
1 parent d3d5fc2 commit e4f1b42
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 18 deletions.
25 changes: 10 additions & 15 deletions modules/cells/src/main/java/dmg/cells/nucleus/CellGlue.java
Expand Up @@ -356,7 +356,8 @@ private void _kill(CellNucleus source, final CellNucleus destination, long to)
*
* @param msg The cell envelope
* @param resolveLocally Whether to deliver messages for @local addresses to local cells
* @param resolveRemotely Whether to deliver messages for @local addresses to remote cells
* @param resolveRemotely Whether to deliver messages for @local addresses through routes with
* a domain address as a target
* @throws SerializationException
*/
void sendMessage(CellMessage msg, boolean resolveLocally, boolean resolveRemotely)
Expand Down Expand Up @@ -411,7 +412,7 @@ private void sendMessage(CellMessage msg, CellAddressCore address, boolean resol
/* If the address if not fully qualified we have the choice of resolving
* it locally or through the routing table.
*/
if (address.getCellDomainName().equals("local")) {
if (address.isLocalAddress()) {
if (resolveLocally && deliverLocally(msg, address)) {
return;
}
Expand All @@ -431,13 +432,6 @@ private void sendMessage(CellMessage msg, CellAddressCore address, boolean resol
}
hasTopicRoutes = true;
}

if (!resolveRemotely) {
if (!hasTopicRoutes) {
sendException(msg, address.toString());
}
return;
}
}

/* Unless we updated the destination path, there is no reason to send the message back to where
Expand All @@ -452,14 +446,9 @@ private void sendMessage(CellMessage msg, CellAddressCore address, boolean resol
return;
}

/* The delivery restrictions do not apply to routes.
*/
resolveLocally = true;
resolveRemotely = true;

/* Lookup a route.
*/
CellRoute route = _routingTable.find(address);
CellRoute route = _routingTable.find(address, resolveRemotely);
if (route == null) {
LOGGER.trace("sendMessage : no route destination for : {}", address);
if (!hasTopicRoutes) {
Expand All @@ -477,6 +466,12 @@ private void sendMessage(CellMessage msg, CellAddressCore address, boolean resol
destination.replaceCurrent(address);
hasDestinationChanged = true;
}

/* The delivery restrictions do not apply to routes.
*/
resolveLocally = true;
resolveRemotely = true;

steps--;
}
// end of big iteration loop
Expand Down
Expand Up @@ -994,7 +994,7 @@ public void routeDelete(CellRoute route) throws IllegalArgumentException {
__cellGlue.routeDelete(route);
}
CellRoute routeFind(CellAddressCore addr) {
return __cellGlue.getRoutingTable().find(addr);
return __cellGlue.getRoutingTable().find(addr, true);
}
public CellRoutingTable getRoutingTable() { return __cellGlue.getRoutingTable(); }
public CellRoute [] getRoutingList() { return __cellGlue.getRoutingList(); }
Expand Down
Expand Up @@ -24,6 +24,7 @@

import org.dcache.util.ColumnWriter;

import static com.google.common.collect.Iterables.*;
import static java.util.stream.Collectors.toList;

public class CellRoutingTable implements Serializable
Expand Down Expand Up @@ -199,7 +200,7 @@ private void delete(Collection<CellRoute> values, String addr, Collection<CellRo
}
}

public CellRoute find(CellAddressCore addr)
public CellRoute find(CellAddressCore addr, boolean allowRemote)
{
String cellName = addr.getCellName();
String domainName = addr.getCellDomainName();
Expand All @@ -217,7 +218,11 @@ public CellRoute find(CellAddressCore addr)
//
synchronized (_queue) {
List<CellRoute> routes = _queue.get(cellName);
if (!routes.isEmpty()) {
if (!allowRemote) {
CellRoute[] localRoutes =
routes.stream().filter(r -> !r.getTarget().isDomainAddress()).toArray(CellRoute[]::new);
return (localRoutes.length > 0) ? localRoutes[_random.nextInt(localRoutes.length)] : null;
} else if (!routes.isEmpty()) {
return routes.get(_random.nextInt(routes.size()));
}
}
Expand Down

0 comments on commit e4f1b42

Please sign in to comment.