Skip to content

Commit

Permalink
doors: Include IO queue in pool selection request
Browse files Browse the repository at this point in the history
Motivation:

Pool manager contains 'magic' to adjust cost estimates when selecting a pool.
This magic relies on the pool manager knowing which queue the request will be
submitted to. Most doors do however not include this information in the
request.

Modification:

Include the io queue in the pool selection request.

Result:

Fixed a bug that cost pool manager to adjust cost estimates on the wrong
mover queue when selecting pools.

Target: trunk
Require-notes: yes
Require-book: no
Request: 2.16
Acked-by: Paul Millar <paul.millar@desy.de>

Reviewed at https://rb.dcache.org/r/9708/
  • Loading branch information
gbehrmann committed Sep 7, 2016
1 parent a29ba61 commit d56eee2
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 29 deletions.
Expand Up @@ -672,6 +672,7 @@ public FtpTransfer(FsPath path,
setPoolStub(_poolStub);
setBillingStub(_billingStub);
setAllocation(_allo);
setIoQueue(_settings.getIoQueueName());

_offset = offset;
_size = size;
Expand Down Expand Up @@ -3084,7 +3085,7 @@ private void retrieve(String file, long offset, long size,
* transfer a few times.
*/
transfer.createAdapter();
transfer.selectPoolAndStartMoverAsync(_settings.getIoQueueName(), _readRetryPolicy);
transfer.selectPoolAndStartMoverAsync(_readRetryPolicy);
} catch (PermissionDeniedCacheException e) {
transfer.abort(550, "Permission denied");
} catch (CacheException e) {
Expand Down Expand Up @@ -3208,7 +3209,7 @@ private void store(String file, Mode mode, String xferMode,
}

transfer.createAdapter();
transfer.selectPoolAndStartMoverAsync(_settings.getIoQueueName(), _writeRetryPolicy);
transfer.selectPoolAndStartMoverAsync(_writeRetryPolicy);
} catch (IOException e) {
transfer.abort(451, "Operation failed: " + e.getMessage());
} catch (PermissionDeniedCacheException e) {
Expand Down
Expand Up @@ -496,6 +496,7 @@ public Layout layoutGet(CompoundContext context, Inode nfsInode, int layoutType,
transfer.setPoolManagerStub(_poolManagerStub);
transfer.setPnfsId(pnfsId);
transfer.setClientAddress(remote);
transfer.setIoQueue(_ioQueue);

/*
* Bind transfer to open-state.
Expand Down Expand Up @@ -536,7 +537,7 @@ public Layout layoutGet(CompoundContext context, Inode nfsInode, int layoutType,
throw new NfsIoException("lost file " + inode.getId());
}

PoolDS ds = transfer.getPoolDataServer(_ioQueue, NFS_REQUEST_BLOCKING);
PoolDS ds = transfer.getPoolDataServer(NFS_REQUEST_BLOCKING);
deviceid = ds.getDeviceId();
}
}
Expand Down Expand Up @@ -895,7 +896,7 @@ Inode getInode() {
return _nfsInode;
}

PoolDS getPoolDataServer(String queue, long timeout) throws
PoolDS getPoolDataServer(long timeout) throws
InterruptedException, ExecutionException,
TimeoutException, CacheException {

Expand All @@ -910,10 +911,10 @@ PoolDS getPoolDataServer(String queue, long timeout) throws

_log.debug("looking for {} pool for {}", (isWrite() ? "write" : "read"), getPnfsId());

_redirectFuture = selectPoolAndStartMoverAsync(queue, RETRY_POLICY);
_redirectFuture = selectPoolAndStartMoverAsync(RETRY_POLICY);
} else {
// we may re-send the request, but pool will handle it
_redirectFuture = startMoverAsync(queue, NFS_REQUEST_BLOCKING);
_redirectFuture = startMoverAsync(NFS_REQUEST_BLOCKING);
}
}
}
Expand Down
Expand Up @@ -668,7 +668,7 @@ public DcacheResource createFile(FsPath path, InputStream inputStream, Long leng
try {
transfer.setLength(length);
try {
transfer.selectPoolAndStartMover(_ioQueue, _retryPolicy);
transfer.selectPoolAndStartMover(_retryPolicy);
String uri = transfer.waitForRedirect(_moverTimeout, _moverTimeoutUnit);
if (uri == null) {
throw new TimeoutCacheException("Server is busy (internal timeout)");
Expand Down Expand Up @@ -716,7 +716,7 @@ public String getWriteUrl(FsPath path, Long length)
transfer.createNameSpaceEntry();
try {
transfer.setLength(length);
transfer.selectPoolAndStartMover(_ioQueue, _retryPolicy);
transfer.selectPoolAndStartMover(_retryPolicy);
uri = transfer.waitForRedirect(_moverTimeout, _moverTimeoutUnit);
if (uri == null) {
throw new TimeoutCacheException("Server is busy (internal timeout)");
Expand Down Expand Up @@ -1055,7 +1055,7 @@ private ReadTransfer beginRead(FsPath path, PnfsId pnfsid, boolean isProxyTransf
transfer.setProxyTransfer(isProxyTransfer);
transfer.readNameSpaceEntry(false);
try {
transfer.selectPoolAndStartMover(_ioQueue, _retryPolicy);
transfer.selectPoolAndStartMover(_retryPolicy);
uri = transfer.waitForRedirect(_moverTimeout, _moverTimeoutUnit);
if (uri == null) {
throw new TimeoutCacheException("Server is busy (internal timeout)");
Expand Down Expand Up @@ -1208,6 +1208,7 @@ private void initializeTransfer(HttpTransfer transfer, Subject subject)
transfer.setPoolManagerStub(_poolManagerStub);
transfer.setPoolStub(_poolStub);
transfer.setBillingStub(_billingStub);
transfer.setIoQueue(_ioQueue);
List<InetSocketAddress> addresses = Subjects.getOrigin(subject).getClientChain().stream().
map(a -> new InetSocketAddress(a, PROTOCOL_INFO_UNKNOWN_PORT)).
collect(Collectors.toList());
Expand Down
Expand Up @@ -313,7 +313,7 @@ public void getInfo(PrintWriter pw)

private XrootdTransfer
createTransfer(InetSocketAddress client, FsPath path,
UUID uuid, InetSocketAddress local, Subject subject,
String ioQueue, UUID uuid, InetSocketAddress local, Subject subject,
Restriction restriction)
{
XrootdTransfer transfer =
Expand Down Expand Up @@ -344,6 +344,7 @@ public synchronized void finished(CacheException error)
transfer.setClientAddress(client);
transfer.setUUID(uuid);
transfer.setDoorAddress(local);
transfer.setIoQueue(ioQueue == null ? _ioQueue : ioQueue);
transfer.setFileHandle(_handleCounter.getAndIncrement());
return transfer;
}
Expand All @@ -358,15 +359,15 @@ public synchronized void finished(CacheException error)
}

XrootdTransfer transfer =
createTransfer(client, path, uuid, local, subject, restriction);
createTransfer(client, path, ioQueue, uuid, local, subject, restriction);
int handle = transfer.getFileHandle();

InetSocketAddress address = null;
_transfers.put(handle, transfer);
String explanation = "unspecified problem";
try {
transfer.readNameSpaceEntry(false);
transfer.selectPoolAndStartMover(ioQueue == null ? _ioQueue : ioQueue, RETRY_POLICY);
transfer.selectPoolAndStartMover(RETRY_POLICY);
address = transfer.waitForRedirect(_moverTimeout, _moverTimeoutUnit);
if (address == null) {
throw new CacheException(transfer.getPool() + " failed to open TCP socket");
Expand Down Expand Up @@ -408,7 +409,7 @@ public synchronized void finished(CacheException error)
}

XrootdTransfer transfer =
createTransfer(client, path, uuid, local, subject, restriction);
createTransfer(client, path, ioQueue, uuid, local, subject, restriction);
transfer.setOverwriteAllowed(overwrite);
int handle = transfer.getFileHandle();
InetSocketAddress address = null;
Expand All @@ -424,7 +425,7 @@ public synchronized void finished(CacheException error)
transfer.setLength(size);
}
try {
transfer.selectPoolAndStartMover(ioQueue == null ? _ioQueue : ioQueue, RETRY_POLICY);
transfer.selectPoolAndStartMover(RETRY_POLICY);

address = transfer.waitForRedirect(_moverTimeout, _moverTimeoutUnit);
if (address == null) {
Expand Down
Expand Up @@ -333,13 +333,15 @@ private void copy(Subject subject,
_source.setPoolManagerStub(_poolManager);
_source.setPoolStub(_poolStub);
_source.setCellAddress(getCellAddress());
_source.setIoQueue("p2p");
// _source.setClientAddress();
// _source.setBillingStub();
// _source.setCheckStagePermission();

_target.setPoolManagerStub(_poolManager);
_target.setPoolStub(_poolStub);
_target.setCellAddress(getCellAddress());
_target.setIoQueue("pp");
// _target.setClientAddress();
// _target.setBillingStub();

Expand All @@ -355,11 +357,11 @@ private void copy(Subject subject,

_target.setProtocolInfo(createTargetProtocolInfo(_target));
_target.setLength(_source.getLength());
_target.selectPoolAndStartMover("pp", TransferRetryPolicies.tryOncePolicy());
_target.selectPoolAndStartMover(TransferRetryPolicies.tryOncePolicy());
_target.waitForRedirect(timeout);

_source.setProtocolInfo(createSourceProtocolInfo(_target.getRedirect(), _target.getId()));
_source.selectPoolAndStartMover("p2p", TransferRetryPolicies.tryOncePolicy());
_source.selectPoolAndStartMover(TransferRetryPolicies.tryOncePolicy());

if (!_source.waitForMover(timeout)) {
throw new TimeoutCacheException("copy: wait for DoorTransferFinishedMessage expired");
Expand Down
Expand Up @@ -52,9 +52,9 @@ public AsynchronousRedirectedTransfer(Executor executor, PnfsHandler pnfs, Subje
}

@Override
public ListenableFuture<Void> selectPoolAndStartMoverAsync(String queue, TransferRetryPolicy policy)
public ListenableFuture<Void> selectPoolAndStartMoverAsync(TransferRetryPolicy policy)
{
return monitor.setQueueFuture(super.selectPoolAndStartMoverAsync(queue, policy));
return monitor.setQueueFuture(super.selectPoolAndStartMoverAsync(policy));
}

/**
Expand Down
50 changes: 39 additions & 11 deletions modules/dcache/src/main/java/org/dcache/util/Transfer.java
Expand Up @@ -13,6 +13,7 @@
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

import javax.annotation.Nullable;
import javax.security.auth.Subject;

import java.io.IOException;
Expand Down Expand Up @@ -116,6 +117,7 @@ public class Transfer implements Comparable<Transfer>
private ProtocolInfo _protocolInfo;
private boolean _isWrite;
private List<InetSocketAddress> _clientAddresses;
private String _ioQueue;

private long _allocated;

Expand Down Expand Up @@ -169,7 +171,7 @@ public Transfer(PnfsHandler pnfs, Subject subject, Restriction restriction, FsPa
*/
protected ProtocolInfo getProtocolInfoForPoolManager()
{
checkNotNull(_protocolInfo);
checkState(_protocolInfo != null);
return _protocolInfo;
}

Expand All @@ -179,7 +181,7 @@ protected ProtocolInfo getProtocolInfoForPoolManager()
*/
protected ProtocolInfo getProtocolInfoForPool()
{
checkNotNull(_protocolInfo);
checkState(_protocolInfo != null);
return _protocolInfo;
}

Expand All @@ -194,6 +196,7 @@ public synchronized void setProtocolInfo(ProtocolInfo info)
/**
* Returns the ProtocolInfo used for the transfer. May be null.
*/
@Nullable
public synchronized ProtocolInfo getProtocolInfo()
{
return _protocolInfo;
Expand Down Expand Up @@ -288,6 +291,7 @@ public void setStatusUntil(String status, ListenableFuture<?> future)
/**
* Sets the current status of a pool. May be null.
*/
@Nullable
public synchronized String getStatus()
{
return _status;
Expand Down Expand Up @@ -340,6 +344,7 @@ public synchronized String getBillingPath()
/**
* Returns the PnfsId of the file to be transferred.
*/
@Nullable
public synchronized PnfsId getPnfsId()
{
return _fileAttributes.isDefined(PNFSID) ? _fileAttributes.getPnfsId() : null;
Expand Down Expand Up @@ -383,6 +388,7 @@ public synchronized void setMoverId(Integer moverId)
/**
* Returns the ID of the mover of this transfer.
*/
@Nullable
public synchronized Integer getMoverId()
{
return _moverId;
Expand All @@ -408,6 +414,7 @@ public synchronized void setPool(String pool)
/**
* Returns the pool to use for this transfer.
*/
@Nullable
public synchronized String getPool()
{
return _poolName;
Expand All @@ -424,6 +431,7 @@ public synchronized void setPoolAddress(CellAddressCore poolAddress)
/**
* Returns the address of the pool to use for this transfer.
*/
@Nullable
public synchronized CellAddressCore getPoolAddress()
{
return _poolAddress;
Expand Down Expand Up @@ -529,6 +537,7 @@ public synchronized void setCellAddress(CellAddressCore address)
*/
public synchronized String getCellName()
{
checkState(_cellAddress != null);
return _cellAddress.getCellName();
}

Expand All @@ -537,6 +546,7 @@ public synchronized String getCellName()
*/
public synchronized String getDomainName()
{
checkState(_cellAddress != null);
return _cellAddress.getCellDomainName();
}

Expand Down Expand Up @@ -566,6 +576,7 @@ public synchronized void setClientAddresses(List<InetSocketAddress> addresses)
* Report the address of the client that connected to dCache when
* initiated this transfer.
*/
@Nullable
public synchronized InetSocketAddress getClientAddress()
{
return _clientAddresses == null ? null : _clientAddresses.get(0);
Expand All @@ -577,6 +588,7 @@ public synchronized InetSocketAddress getClientAddress()
* earlier in the list represent relay clients. The first item is the
* client that directly connected to dCache.
*/
@Nullable
public synchronized List<InetSocketAddress> getClientAddresses()
{
return _clientAddresses;
Expand Down Expand Up @@ -852,6 +864,23 @@ public synchronized void setAllocation(long length)
_allocated = length;
}

/**
* Sets the mover queue to use.
*/
public synchronized void setIoQueue(String queue)
{
_ioQueue = queue;
}

/**
* Returns the mover queue to be used.
*/
@Nullable
public synchronized String getIoQueue()
{
return _ioQueue;
}

/**
* Returns the read pool selection context.
*/
Expand Down Expand Up @@ -891,6 +920,7 @@ public ListenableFuture<Void> selectPoolAsync(long timeout)
request.setSubject(_subject);
request.setBillingPath(getBillingPath());
request.setTransferPath(getTransferPath());
request.setIoQueueName(getIoQueue());

ListenableFuture<PoolMgrSelectWritePoolMsg> reply = _poolManager.sendAsync(request, timeout);
setStatusUntil("PoolManager: Selecting pool", reply);
Expand Down Expand Up @@ -922,6 +952,7 @@ public ListenableFuture<Void> selectPoolAsync(long timeout)
request.setSubject(_subject);
request.setBillingPath(getBillingPath());
request.setTransferPath(getTransferPath());
request.setIoQueueName(getIoQueue());

ListenableFuture<PoolMgrSelectReadPoolMsg> reply = _poolManager.sendAsync(request, timeout);
setStatusUntil("PoolManager: Selecting pool", reply);
Expand All @@ -938,10 +969,8 @@ public ListenableFuture<Void> selectPoolAsync(long timeout)

/**
* Creates a mover for the transfer.
*
* @param queue The mover queue of the transfer; may be null
*/
public ListenableFuture<Void> startMoverAsync(String queue, long timeout)
public ListenableFuture<Void> startMoverAsync(long timeout)
{
FileAttributes fileAttributes = getFileAttributes();
String pool = getPool();
Expand All @@ -965,7 +994,7 @@ public ListenableFuture<Void> startMoverAsync(String queue, long timeout)
}
message.setBillingPath(getBillingPath());
message.setTransferPath(getTransferPath());
message.setIoQueueName(queue);
message.setIoQueueName(getIoQueue());
message.setInitiator(getTransaction());
message.setId(_id);
message.setSubject(_subject);
Expand Down Expand Up @@ -1136,25 +1165,24 @@ private static long getTimeoutFor(PnfsHandler pnfs, long deadline)
* according to the {@link TransferRetryPolicy}. Note, that there
* will be no retries on uploads.
*
* @param queue where mover should be started
* @param policy to handle error cases
* @throws CacheException
* @throws InterruptedException
*/
public void selectPoolAndStartMover(String queue, TransferRetryPolicy policy)
public void selectPoolAndStartMover(TransferRetryPolicy policy)
throws CacheException, InterruptedException
{
getCancellable(selectPoolAndStartMoverAsync(queue, policy));
getCancellable(selectPoolAndStartMoverAsync(policy));
}

public ListenableFuture<Void> selectPoolAndStartMoverAsync(String queue, TransferRetryPolicy policy)
public ListenableFuture<Void> selectPoolAndStartMoverAsync(TransferRetryPolicy policy)
{
long deadLine = addWithInfinity(System.currentTimeMillis(), policy.getTotalTimeOut());

AsyncFunction<Void, Void> selectPool =
ignored -> selectPoolAsync(getTimeoutFor(deadLine));
AsyncFunction<Void, Void> startMover =
ignored -> startMoverAsync(queue, getTimeoutFor(deadLine));
ignored -> startMoverAsync(getTimeoutFor(deadLine));
AsyncFunction<Void, Void> readNameSpaceEntry =
ignored -> readNameSpaceEntryAsync(false, getTimeoutFor(_pnfs, deadLine));

Expand Down

0 comments on commit d56eee2

Please sign in to comment.