Skip to content

Commit

Permalink
info: use DGA refresh rate as message timeout
Browse files Browse the repository at this point in the history
Motivation:

DGAs send messages to other dCache cells to discover their current
status.  These messages have a hard-coded one second timeout.

Some queries are data-intensive and could take longer than one second to
build the answer, resulting in no information being provided.

Modification:

DGAs already have the concept of some queries taking longer: the DGA
refresh period.  This period is how long the DGA will wait, after
querying a cell, before querying the cell for updated information.

Therefore, it makes sense to use this DGA refresh period as the timeout
for messages send to other cells.

Result:

The info service will now wait longer for a cell to respond to a query
for information.

Target: master
Requires-notes: yes
Requires-book: no
Request: 9.2
Ticket: https://rt.dcache.org/Ticket/Display.html?id=10592
Patch: https://rb.dcache.org/r/14226/
Acked-by: Tigran Mkrtchyan
Acked-by: Lea Morschel
  • Loading branch information
paulmillar authored and mksahakyan committed Mar 11, 2024
1 parent 58cc1f3 commit 253e07a
Show file tree
Hide file tree
Showing 13 changed files with 62 additions and 29 deletions.
Expand Up @@ -48,7 +48,8 @@ public LinkgroupListDga(CellPath spacemanager, int interval, MessageHandlerChain
public void trigger() {
super.trigger();
LOGGER.trace("Sending linkgroup list request message");
_mhc.sendMessage(_metricLifetime, _spacemanager, new GetLinkGroupNamesMessage());
_mhc.sendMessage(_metricLifetime, _spacemanager, triggerPeriod(),
new GetLinkGroupNamesMessage());
}


Expand Down
Expand Up @@ -69,7 +69,8 @@ public void trigger() {
sb.append(" ");
sb.append(item);

_sender.sendMessage(getMetricLifetime(), _handler, _cellPath, sb.toString());
_sender.sendMessage(getMetricLifetime(), _handler, _cellPath,
listRefreshPeriod(), sb.toString());
}

@Override
Expand Down
Expand Up @@ -18,6 +18,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -42,11 +43,6 @@ public class MessageHandlerChain implements MessageMetadataRepository<UOID>,
private static final long METADATA_FLUSH_THRESHOLD = 3600000; // 1 hour
private static final long METADATA_FLUSH_PERIOD = 600000; // 10 minutes

/**
* Our default timeout for sending messages, in milliseconds
*/
private static final long STANDARD_TIMEOUT = 1000;

private static final Logger LOGGER = LoggerFactory.getLogger(MessageHandlerChain.class);
private final List<MessageHandler> _messageHandler = new LinkedList<>();
private CellEndpoint _endpoint;
Expand Down Expand Up @@ -83,18 +79,19 @@ public String[] listMessageHandlers() {
* @param ttl lifetime of resulting metric, in seconds.
* @param handler the call-back handler for the return message
* @param path the CellPath to target cell
* @param timeout the time to wait for a response, in milliseconds
* @param requestString the String, requesting information
*/
@Override
public void sendMessage(long ttl, CellMessageAnswerable handler,
CellPath path, String requestString) {
CellPath path, long timeout, String requestString) {
if (handler == null) {
LOGGER.error("ignoring attempt to send string-based message without call-back");
return;
}

CellMessage envelope = new CellMessage(path, requestString);
sendMessage(ttl, handler, envelope);
sendMessage(ttl, handler, timeout, envelope);
}


Expand All @@ -103,12 +100,13 @@ public void sendMessage(long ttl, CellMessageAnswerable handler,
*
* @param ttl lifetime of resulting metric, in seconds.
* @param path the CellPath for the recipient of this message
* @param timeout the time to wait for a response, in milliseconds
* @param message the Message payload
*/
@Override
public void sendMessage(long ttl, CellPath path, Message message) {
public void sendMessage(long ttl, CellPath path, long timeout, Message message) {
CellMessage envelope = new CellMessage(path, message);
sendMessage(ttl, null, envelope);
sendMessage(ttl, null, timeout, envelope);
}


Expand All @@ -117,15 +115,16 @@ public void sendMessage(long ttl, CellPath path, Message message) {
*
* @param ttl the metadata for the message
* @param handler the call-back for this method, or null if none should be used.
* @param timeout the time to wait for a response, in milliseconds
* @param envelope the message to send
* @throws SerializationException if the payload isn't serialisable.
*/
@Override
public void sendMessage(long ttl, CellMessageAnswerable handler,
CellMessage envelope) throws SerializationException {
long timeout, CellMessage envelope) throws SerializationException {
putMetricTTL(envelope.getUOID(), ttl);
_endpoint.sendMessage(envelope, handler != null ? handler : this,
MoreExecutors.directExecutor(), STANDARD_TIMEOUT);
MoreExecutors.directExecutor(), timeout);
}

public void setHandlers(List<MessageHandler> handlers) {
Expand Down
Expand Up @@ -19,9 +19,10 @@ public interface MessageSender {
*
* @param ttl how long, in seconds, resulting metrics should last
* @param path the destination for this request
* @param timeout the number of milliseconds to wait for a response.
* @param message the vehicle to send
*/
void sendMessage(long ttl, CellPath path, Message message);
void sendMessage(long ttl, CellPath path, long timeout, Message message);

/**
* Send some arbitrary CellMessage (which includes the payload and the target Cell). The
Expand All @@ -33,9 +34,10 @@ public interface MessageSender {
*
* @param ttl how long, in seconds, resulting metrics should last
* @param handler the object that is to receive reply message
* @param timeout the number of milliseconds to wait for a response.
* @param envelope the complete message envelope to send
*/
void sendMessage(long ttl, CellMessageAnswerable handler,
void sendMessage(long ttl, CellMessageAnswerable handler, long timeout,
CellMessage envelope);

/**
Expand All @@ -49,8 +51,9 @@ void sendMessage(long ttl, CellMessageAnswerable handler,
* @param ttl how long, in seconds, resulting metrics should last
* @param handler the object that is to receive reply message
* @param path the destination for this request
* @param timeout the number of milliseconds to wait for a response.
* @param requestString the String sent to the cell's shell
*/
void sendMessage(long ttl, CellMessageAnswerable handler, CellPath path,
String requestString);
long timeout, String requestString);
}
Expand Up @@ -78,9 +78,11 @@ public void trigger() {
super.trigger();

if (_requestMessage != null) {
_sender.sendMessage(metricLifetime(), null, new CellMessage(_target, _requestMessage));
_sender.sendMessage(metricLifetime(), null, triggerPeriod(),
new CellMessage(_target, _requestMessage));
} else {
_sender.sendMessage(metricLifetime(), _handler, _target, _requestString);
_sender.sendMessage(metricLifetime(), _handler, _target,
triggerPeriod(), _requestString);
}
}

Expand Down
@@ -1,5 +1,7 @@
package org.dcache.services.info.gathers;

import java.time.Duration;
import java.time.Instant;
import java.util.Date;
import java.util.Set;
import java.util.Stack;
Expand Down Expand Up @@ -55,6 +57,11 @@ public abstract class SkelListBasedActivity implements Schedulable {
*/
private Date _whenListRefresh;

/**
* The current time between successive bursts of messages, in milliseconds.
*/
private long _listRefreshPeriod;

/**
* When we should next send a message
*/
Expand Down Expand Up @@ -110,6 +117,7 @@ protected SkelListBasedActivity(StateExhibitor exhibitor, StatePath parentPath,
updateStack(); // Bring in initial work.

_minimumListRefreshPeriod = minimumListRefreshPeriod;
_listRefreshPeriod = _minimumListRefreshPeriod;
_successiveMsgDelay = successiveMsgDelay;

randomiseDelay(); // Randomise our initial offset.
Expand Down Expand Up @@ -158,16 +166,23 @@ public void trigger() {
* Calculate the earliest we would like to do this again.
*/
long timeToSendAllMsgs = _outstandingWork.size() * _successiveMsgDelay;
long listRefreshPeriod = Math.max(timeToSendAllMsgs, _minimumListRefreshPeriod);
_listRefreshPeriod = Math.max(timeToSendAllMsgs, _minimumListRefreshPeriod);

_whenListRefresh = new Date(System.currentTimeMillis() + listRefreshPeriod);
_whenListRefresh = new Date(System.currentTimeMillis() + _listRefreshPeriod);

/**
* All metrics that are generated should have a lifetime based on when we expect
* to refresh the list and generate more metrics.
* The 2.5 factor allows for both 50% growth and a message being lost.
*/
_metricLifetime = (long) (2.5 * listRefreshPeriod / 1000.0);
_metricLifetime = (long) (2.5 * _listRefreshPeriod / 1000.0);
}

/**
* The number of second between bursts of messages, in milliseconds.
*/
public long listRefreshPeriod() {
return _listRefreshPeriod;
}


Expand Down
Expand Up @@ -28,6 +28,14 @@ public SkelPeriodicActivity(long period) {
_period)));
}

/**
* The duration between successive triggers.
* @return duration in milliseconds.
*/
public long triggerPeriod() {
return TimeUnit.SECONDS.toMillis(_period);
}

@Override
public Date shouldNextBeTriggered() {
return new Date(_nextTrigger.getTime());
Expand Down
Expand Up @@ -52,7 +52,8 @@ public void trigger() {

LOGGER.info("sending message getcellinfos to System cell on domain {}", domainName);

_sender.sendMessage(getMetricLifetime(), _handler, systemCellPath, "getcellinfos");
_sender.sendMessage(getMetricLifetime(), _handler, systemCellPath,
listRefreshPeriod(), "getcellinfos");
}

/**
Expand Down
Expand Up @@ -51,7 +51,7 @@ public void trigger() {
COMMAND, domain);

_sender.sendMessage(getMetricLifetime(), _handler, path,
COMMAND);
listRefreshPeriod(), COMMAND);
}
}

Expand Down
Expand Up @@ -66,7 +66,8 @@ public void trigger() {
CellPath routingMgrCellPath = new CellPath("RoutingMgr", domainName);

LOGGER.info("sending message to RoutingMgr cell on domain {}", domainName);
_sender.sendMessage(getMetricLifetime(), _handler, routingMgrCellPath, "ls -x");
_sender.sendMessage(getMetricLifetime(), _handler, routingMgrCellPath,
listRefreshPeriod(), "ls -x");
}

/**
Expand Down
Expand Up @@ -57,7 +57,8 @@ public void trigger() {

LOGGER.trace("Sending linkgroup details request message");

_sender.sendMessage(_metricLifetime, _spacemanager, new GetLinkGroupsMessage());
_sender.sendMessage(_metricLifetime, _spacemanager, triggerPeriod(),
new GetLinkGroupsMessage());
}


Expand Down
Expand Up @@ -56,7 +56,8 @@ public void trigger() {

LOGGER.trace("Sending space token details request message");

_sender.sendMessage(_metricLifetime, _spacemanager, new GetSpaceTokensMessage(true));
_sender.sendMessage(_metricLifetime, _spacemanager, triggerPeriod(),
new GetSpaceTokensMessage(true));
}

@Override
Expand Down
Expand Up @@ -95,7 +95,7 @@ public void testSendMessageEnvelope() {
};
CellMessage msg = new CellMessage(dest, obj);

_sender.sendMessage(10, new CactusCellMessageAnswerable(), msg);
_sender.sendMessage(10, new CactusCellMessageAnswerable(), 1000, msg);

List<CellMessage> sentMsgs = _endpoint.getSentMessages();

Expand All @@ -112,7 +112,7 @@ public void testSendMessage() {
CellPath dest = new CellPath("test-cell", "test-domain");
Message vehicle = new Message();

_sender.sendMessage(10, dest, vehicle);
_sender.sendMessage(10, dest, 1000, vehicle);

List<CellMessage> sentMsgs = _endpoint.getSentMessages();

Expand All @@ -129,7 +129,7 @@ public void testSendStringMessage() {
CellPath dest = new CellPath("test-cell", "test-domain");
String request = "get all data";

_sender.sendMessage(10, new CactusCellMessageAnswerable(), dest, request);
_sender.sendMessage(10, new CactusCellMessageAnswerable(), dest, 1000, request);

List<CellMessage> sentMsgs = _endpoint.getSentMessages();

Expand Down

0 comments on commit 253e07a

Please sign in to comment.