Skip to content

Commit

Permalink
httpd: Use full cell address for login broker messaging
Browse files Browse the repository at this point in the history
This patch is similar to http://rb.dcache.org/r/5117/, except that this
patch applies to login broker related classes.

Target: trunk
Require-notes: no
Require-book: no
Acked-by: Tigran Mkrtchyan <tigran.mkrtchyan@desy.de>
Patch: http://rb.dcache.org/r/5119/
  • Loading branch information
Gerd Behrmann committed Jan 24, 2013
1 parent bf10dfc commit 327b428
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 101 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

import java.io.Serializable;

import static com.google.common.base.Preconditions.checkNotNull;

/**
*
* Is the core of the CellDomain addressing scheme. The
Expand Down Expand Up @@ -65,8 +67,9 @@ public CellAddressCore(String addr) {
}

public CellAddressCore(String addr, String domain) {
checkNotNull(addr);
_cell = addr;
_domain = domain;
_domain = (domain == null) ? "local" : domain;
_hashcode = (_domain + _cell).hashCode();
}

Expand All @@ -89,8 +92,7 @@ public String getCellDomainName() {

@Override
public String toString() {
return (_cell != null ? _cell : "UnknownCell") + "@"
+ (_domain != null ? _domain : "UnknownDomain");
return _cell + "@" + _domain;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.*;
import java.io.*;
import java.lang.reflect.*;
import java.util.concurrent.ConcurrentHashMap;

import dmg.util.*;
import dmg.cells.nucleus.*;
Expand Down Expand Up @@ -130,18 +131,19 @@ private String mapOwner(String owner)

private static class DoorHandler
{
private final Map<String, Entry> _doors = new HashMap<>();
private final Map<CellAddressCore, Entry> _doors = new ConcurrentHashMap<>();

private synchronized Entry defineDoor(String doorName)
private synchronized Entry defineDoor(CellAddressCore address)
{
Entry entry = _doors.get(doorName);
Entry entry = _doors.get(address);
if (entry == null) {
_doors.put(doorName, entry = new Entry(doorName, true));
entry = new Entry(address, true);
_doors.put(address, entry);
}
return entry;
}

private Set<String> doors()
private Set<CellAddressCore> doors()
{
return _doors.keySet();
}
Expand All @@ -151,41 +153,38 @@ private Collection<Entry> entries()
return _doors.values();
}

private synchronized Entry undefineDoor(String doorName)
private Entry undefineDoor(CellAddressCore address)
{
Entry entry = _doors.get(doorName);
Entry entry = _doors.get(address);
if (entry != null) {
entry.setFixed(false);
}
return entry;
}

private synchronized Entry addDoor(String doorName)
private synchronized Entry addDoor(CellAddressCore door)
{
Entry entry = _doors.get(doorName);
Entry entry = _doors.get(door);
if (entry == null) {
_doors.put(doorName, entry = new Entry(doorName, false));
entry = new Entry(door, false);
_doors.put(door, entry);
}
return entry;
}

private synchronized Entry setDoorInfo(LoginManagerChildrenInfo info)
private Entry setDoorInfo(LoginManagerChildrenInfo info)
{
String doorName = info.getCellName()+"@"+info.getCellDomainName();
Entry entry = _doors.get(doorName);
if (entry == null) {
_doors.put(doorName, entry = new Entry(doorName));
}
Entry entry =
addDoor(new CellAddressCore(info.getCellName(), info.getCellDomainName()));
entry.setChildInfo(info);
return entry;
}

private synchronized void clear()
{
Iterator<Map.Entry<String,Entry>> i = _doors.entrySet().iterator();
Iterator<Entry> i = _doors.values().iterator();
while (i.hasNext()) {
Map.Entry<String,Entry> e = i.next();
Entry entry = e.getValue();
Entry entry = i.next();
if (entry.isFixed()) {
entry.setChildInfo(null);
} else {
Expand All @@ -197,18 +196,13 @@ private synchronized void clear()
private static class Entry
{
private boolean _isFixed;
private String _doorName;
private CellAddressCore _doorAddress;
private LoginManagerChildrenInfo _info;

private Entry(String doorName)
{
this(doorName, false);
}

private Entry(String doorName, boolean isFixed)
private Entry(CellAddressCore doorAddress, boolean isFixed)
{
_isFixed = isFixed;
_doorName = doorName;
_doorAddress = doorAddress;
}

private LoginManagerChildrenInfo getChildInfo()
Expand Down Expand Up @@ -253,7 +247,7 @@ public TransferObserverV1(String name, String args) throws Exception
String doorList = _args.getOpt("doors");
if (doorList != null) {
for (String s : doorList.split(",")) {
_doors.defineDoor(s);
_doors.defineDoor(new CellAddressCore(s));
}
}
//
Expand Down Expand Up @@ -531,14 +525,14 @@ public void run()
}
}

private Object request(String path, Serializable message)
private Object request(CellAddressCore address, Serializable message)
throws Exception
{
CellMessage request = new CellMessage(new CellPath(path), message);
CellMessage request = new CellMessage(new CellPath(address), message);

request = sendAndWait(request, _timeout);
if (request == null) {
throw new Exception(path + " reply timed out");
throw new Exception(address + " reply timed out");
}

return request.getMessageObject();
Expand All @@ -555,17 +549,18 @@ private void getBrokerInfo()
for (String loginBroker : _loginBroker.split(",")) {
_log.info("Requesting doorInfo from LoginBroker " + loginBroker);
try {
CellAddressCore brokerAddress = new CellAddressCore(loginBroker);
LoginBrokerInfo [] infos =
(LoginBrokerInfo [])request(loginBroker, "ls -binary -all");
(LoginBrokerInfo [])request(brokerAddress, "ls -binary -all");

StringBuilder sb = new StringBuilder();
sb.append("LoginBroker (").append(loginBroker)
.append(") : ");
for (LoginBrokerInfo info : infos) {
String doorName =
info.getCellName()+"@"+ info.getDomainName();
_doors.addDoor(doorName);
sb.append(doorName).append(",");
CellAddressCore doorAddress =
new CellAddressCore(info.getCellName(), info.getDomainName());
_doors.addDoor(doorAddress);
sb.append(doorAddress).append(",");
}
_log.info(sb.toString());
infoList.addAll(Arrays.asList(infos));
Expand All @@ -584,18 +579,15 @@ private void collectDataSequentially()
getBrokerInfo();

_log.info("Asking doors for 'doorClientList' (one by one)");
for (String doorName : _doors.doors()) {
_log.info("Requesting client list from : " + doorName);
for (CellAddressCore doorAddress : _doors.doors()) {
_log.info("Requesting client list from : {}", doorAddress);
try {
LoginManagerChildrenInfo info = (LoginManagerChildrenInfo)
request(doorName, "get children -binary");

_log.info(doorName + " reported about " + info.getChildrenCount() +
" children");

request(doorAddress, "get children -binary");
_log.info(doorAddress + " reported about {} children", info.getChildrenCount());
_doors.setDoorInfo(info);
} catch (Exception e) {
_doors.undefineDoor(doorName);
_doors.undefineDoor(doorAddress);
_log.info("Exception : " + e);
}
}
Expand All @@ -611,10 +603,10 @@ private void collectDataSequentially()
continue;
}

for ( String child: info.getChildren()) {
String childDoor = child+"@"+info.getCellDomainName() ;
for (String child: info.getChildren()) {
CellAddressCore childDoor = new CellAddressCore(child, info.getCellDomainName());

_log.info("Requesting client info from : " + childDoor);
_log.info("Requesting client info from: {}", childDoor);
try {
IoDoorInfo ioDoorInfo = (IoDoorInfo)
request(childDoor,"get door info -binary");
Expand All @@ -627,7 +619,7 @@ private void collectDataSequentially()
}

for (IoDoorEntry ioDoorEntry : ioDoorEntries) {
_log.info("Adding ioEntry : " + ioDoorEntry);
_log.info("Adding ioEntry: {}", ioDoorEntry);
ioList.put(childDoor + "#" + ioDoorEntry.getSerialId(),
new IoEntry(ioDoorInfo, ioDoorEntry));
String pool = ioDoorEntry.getPool();
Expand All @@ -638,18 +630,18 @@ private void collectDataSequentially()
}

} catch (Exception e) {
_log.info("Exception : " + e);
_log.info("Exception: {}", e);
}
}
}
_log.info("Asking pools for io info");
for (String poolName : poolHash) {
_log.info("Asking pool : " + poolName);
_log.info("Asking pool: {}", poolName);
try {
IoJobInfo [] infos = (IoJobInfo [] )
request(poolName, "mover ls -binary");
IoJobInfo [] infos = (IoJobInfo [])
request(new CellAddressCore(poolName), "mover ls -binary");

_log.info(poolName + " reply ok");
_log.info("{} reply ok", poolName);

//
// where is our client
Expand All @@ -659,13 +651,13 @@ private void collectDataSequentially()
info.getClientId() ;
IoEntry ioEntry = ioList.get(client);
if (ioEntry == null) {
_log.info("No entry found for : " + client);
_log.info("No entry found for {}", client);
} else {
ioEntry._ioJobInfo = info;
}
}
} catch (Exception e) {
_log.info("Exception : " + e);
_log.info("Exception: {}", e);
}
}
List<IoEntry> resultList;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.io.PrintWriter;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -44,7 +45,7 @@ public class WebCollectorV3 extends CellAdapter implements Runnable
private Thread _senderThread;
private long _counter;
private int _repeatHeader = 30;
private String[] _loginBrokerTable;
private final Collection<CellAddressCore> _loginBrokerTable = new ArrayList<>();

private class SleepHandler
{
Expand Down Expand Up @@ -210,11 +211,12 @@ public WebCollectorV3(String name, String args) throws Exception
}

String loginBrokers = _args.getOpt("loginBroker");
if ((loginBrokers != null) && (loginBrokers.length() > 0)) {
_loginBrokerTable = loginBrokers.split(",");
for (String cellName : _loginBrokerTable) {
_log.info("Login Broker : " + cellName);
addQuery(new CellAddressCore(cellName));
if (loginBrokers != null && !loginBrokers.isEmpty()) {
for (String broker : loginBrokers.split(",")) {
_log.info("Login Broker : {}", broker);
CellAddressCore address = new CellAddressCore(broker);
_loginBrokerTable.add(address);
addQuery(address);
}
}
(_senderThread = _nucleus.newThread(this, "sender")).start();
Expand Down Expand Up @@ -286,22 +288,20 @@ private void runSender()
try {
while (!Thread.interrupted()) {
_counter++;
if (_loginBrokerTable != null) {
for (String broker : _loginBrokerTable) {
try {
CellPath path = new CellPath(broker);
_log.debug("Sending LoginBroker query to : " + path);
sendMessage(new CellMessage(path, "ls -binary"));
} catch (NoRouteToCellException ee) {
for (CellAddressCore broker : _loginBrokerTable) {
try {
CellPath path = new CellPath(broker);
_log.debug("Sending LoginBroker query to : {}", path);
sendMessage(new CellMessage(path, "ls -binary"));
} catch (NoRouteToCellException ee) {

}
}
}
//sendMessage(loginBrokerMessage);
synchronized (_infoLock) {
for (CellQueryInfo info : _infoMap.values()) {
try {
_log.debug("Sending query to : " + info.getName());
_log.debug("Sending query to : {}", info.getName());
sendMessage(info.getCellMessage());
} catch (NoRouteToCellException e) {

Expand Down Expand Up @@ -349,13 +349,12 @@ public void messageArrived(CellMessage message)
}

if (reply instanceof dmg.cells.services.login.LoginBrokerInfo[]) {
_log.debug("Login broker reply: {}", ((dmg.cells.services.login.LoginBrokerInfo[])reply).length);
LoginBrokerInfo [] brokerInfos = (LoginBrokerInfo [])reply;
_log.debug("Login broker reply: {}", brokerInfos.length);
synchronized (_infoLock) {
for (LoginBrokerInfo brokerInfo : brokerInfos) {
String dest = brokerInfo.getCellName();
_log.debug("Login broker reports: {}", dest);
if (addQuery(new CellAddressCore(dest))) {
_log.debug("Login broker reports: {}@{}", brokerInfo.getCellName(), brokerInfo.getDomainName());
if (addQuery(new CellAddressCore(brokerInfo.getCellName(), brokerInfo.getDomainName()))) {
modified++;
}
}
Expand Down
Loading

0 comments on commit 327b428

Please sign in to comment.