Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
doors: Include IO queue in pool selection request
Motivation:

Pool manager contains 'magic' to adjust cost estimates when selecting a pool.
This magic relies on the pool manager knowing which queue the request will be
submitted to. Most doors do however not include this information in the
request.

Modification:

Include the io queue in the pool selection request.

Result:

Fixed a bug that cost pool manager to adjust cost estimates on the wrong
mover queue when selecting pools.

Target: trunk
Require-notes: yes
Require-book: no
Request: 2.16
Acked-by: Paul Millar <paul.millar@desy.de>

Reviewed at https://rb.dcache.org/r/9708/

(cherry picked from commit d56eee2)
  • Loading branch information
gbehrmann committed Sep 7, 2016
1 parent fa2b249 commit d81194b
Show file tree
Hide file tree
Showing 7 changed files with 68 additions and 36 deletions.
Expand Up @@ -665,6 +665,7 @@ public FtpTransfer(FsPath path,
setPoolStub(_poolStub);
setBillingStub(_billingStub);
setAllocation(_allo);
setIoQueue(_settings.getIoQueueName());

_offset = offset;
_size = size;
Expand Down Expand Up @@ -3072,7 +3073,7 @@ private void retrieve(String file, long offset, long size,
* transfer a few times.
*/
transfer.createAdapter();
transfer.selectPoolAndStartMoverAsync(_settings.getIoQueueName(), _readRetryPolicy);
transfer.selectPoolAndStartMoverAsync(_readRetryPolicy);
} catch (PermissionDeniedCacheException e) {
transfer.abort(550, "Permission denied");
} catch (CacheException e) {
Expand Down Expand Up @@ -3196,7 +3197,7 @@ private void store(String file, Mode mode, String xferMode,
}

transfer.createAdapter();
transfer.selectPoolAndStartMoverAsync(_settings.getIoQueueName(), _writeRetryPolicy);
transfer.selectPoolAndStartMoverAsync(_writeRetryPolicy);
} catch (IOException e) {
transfer.abort(451, "Operation failed: " + e.getMessage());
} catch (PermissionDeniedCacheException e) {
Expand Down
Expand Up @@ -469,6 +469,7 @@ public Layout layoutGet(CompoundContext context, Inode nfsInode, int layoutType,
transfer.setPoolManagerStub(_poolManagerStub);
transfer.setPnfsId(pnfsId);
transfer.setClientAddress(remote);
transfer.setIoQueue(_ioQueue);

/*
* Bind transfer to open-state.
Expand Down Expand Up @@ -509,7 +510,7 @@ public Layout layoutGet(CompoundContext context, Inode nfsInode, int layoutType,
throw new NfsIoException("lost file " + inode.getId());
}

PoolDS ds = transfer.getPoolDataServer(_ioQueue, NFS_REQUEST_BLOCKING);
PoolDS ds = transfer.getPoolDataServer(NFS_REQUEST_BLOCKING);
deviceid = ds.getDeviceId();
}
}
Expand Down Expand Up @@ -859,7 +860,7 @@ Inode getInode() {
return _nfsInode;
}

PoolDS getPoolDataServer(String queue, long timeout) throws
PoolDS getPoolDataServer(long timeout) throws
InterruptedException, ExecutionException,
TimeoutException, CacheException {

Expand All @@ -874,10 +875,10 @@ PoolDS getPoolDataServer(String queue, long timeout) throws

_log.debug("looking for {} pool for {}", (isWrite() ? "write" : "read"), getPnfsId());

_redirectFuture = selectPoolAndStartMoverAsync(queue, RETRY_POLICY);
_redirectFuture = selectPoolAndStartMoverAsync(RETRY_POLICY);
} else {
// we may re-send the request, but pool will handle it
_redirectFuture = startMoverAsync(queue, NFS_REQUEST_BLOCKING);
_redirectFuture = startMoverAsync(NFS_REQUEST_BLOCKING);
}
}
}
Expand Down
Expand Up @@ -662,7 +662,7 @@ public DcacheResource createFile(FsPath path, InputStream inputStream, Long leng
try {
transfer.setLength(length);
try {
transfer.selectPoolAndStartMover(_ioQueue, _retryPolicy);
transfer.selectPoolAndStartMover(_retryPolicy);
String uri = transfer.waitForRedirect(_moverTimeout, _moverTimeoutUnit);
if (uri == null) {
throw new TimeoutCacheException("Server is busy (internal timeout)");
Expand Down Expand Up @@ -709,7 +709,7 @@ public String getWriteUrl(FsPath path, Long length)
transfer.createNameSpaceEntry();
try {
transfer.setLength(length);
transfer.selectPoolAndStartMover(_ioQueue, _retryPolicy);
transfer.selectPoolAndStartMover(_retryPolicy);
uri = transfer.waitForRedirect(_moverTimeout, _moverTimeoutUnit);
if (uri == null) {
throw new TimeoutCacheException("Server is busy (internal timeout)");
Expand Down Expand Up @@ -1039,7 +1039,7 @@ private ReadTransfer beginRead(FsPath path, PnfsId pnfsid, boolean isProxyTransf
transfer.setProxyTransfer(isProxyTransfer);
transfer.readNameSpaceEntry(false);
try {
transfer.selectPoolAndStartMover(_ioQueue, _retryPolicy);
transfer.selectPoolAndStartMover(_retryPolicy);
uri = transfer.waitForRedirect(_moverTimeout, _moverTimeoutUnit);
if (uri == null) {
throw new TimeoutCacheException("Server is busy (internal timeout)");
Expand Down Expand Up @@ -1188,6 +1188,7 @@ private void initializeTransfer(HttpTransfer transfer, Subject subject)
transfer.setPoolManagerStub(_poolManagerStub);
transfer.setPoolStub(_poolStub);
transfer.setBillingStub(_billingStub);
transfer.setIoQueue(_ioQueue);
List<InetSocketAddress> addresses = Subjects.getOrigin(subject).getClientChain().stream().
map(a -> new InetSocketAddress(a, PROTOCOL_INFO_UNKNOWN_PORT)).
collect(Collectors.toList());
Expand Down
Expand Up @@ -343,6 +343,7 @@ public synchronized void finished(CacheException error)
transfer.setClientAddress(client);
transfer.setUUID(uuid);
transfer.setDoorAddress(local);
transfer.setIoQueue(_ioQueue);
transfer.setFileHandle(_handleCounter.getAndIncrement());
return transfer;
}
Expand All @@ -364,7 +365,7 @@ public synchronized void finished(CacheException error)
_transfers.put(handle, transfer);
try {
transfer.readNameSpaceEntry(false);
transfer.selectPoolAndStartMover(_ioQueue, RETRY_POLICY);
transfer.selectPoolAndStartMover(RETRY_POLICY);
address = transfer.waitForRedirect(_moverTimeout, _moverTimeoutUnit);
if (address == null) {
throw new CacheException(transfer.getPool() + " failed to open TCP socket");
Expand Down Expand Up @@ -415,7 +416,7 @@ public synchronized void finished(CacheException error)
transfer.createNameSpaceEntry();
}
try {
transfer.selectPoolAndStartMover(_ioQueue, RETRY_POLICY);
transfer.selectPoolAndStartMover(RETRY_POLICY);

address = transfer.waitForRedirect(_moverTimeout, _moverTimeoutUnit);
if (address == null) {
Expand Down
Expand Up @@ -335,6 +335,7 @@ private void copy(Subject subject,
_source.setPoolStub(_poolStub);
_source.setDomainName(getCellDomainName());
_source.setCellName(getCellName());
_source.setIoQueue("p2p");
// _source.setClientAddress();
// _source.setBillingStub();
// _source.setCheckStagePermission();
Expand All @@ -343,6 +344,7 @@ private void copy(Subject subject,
_target.setPoolStub(_poolStub);
_target.setDomainName(getCellDomainName());
_target.setCellName(getCellName());
_target.setIoQueue("pp");
// _target.setClientAddress();
// _target.setBillingStub();

Expand All @@ -357,11 +359,11 @@ private void copy(Subject subject,

_target.setProtocolInfo(createTargetProtocolInfo(_target));
_target.setLength(_source.getLength());
_target.selectPoolAndStartMover("pp", new TransferRetryPolicy(1, 0, _poolManager.getTimeoutInMillis()));
_target.selectPoolAndStartMover(new TransferRetryPolicy(1, 0, _poolManager.getTimeoutInMillis()));
_target.waitForRedirect(timeout);

_source.setProtocolInfo(createSourceProtocolInfo(_target.getRedirect(), _target.getId()));
_source.selectPoolAndStartMover("p2p", new TransferRetryPolicy(1, 0, _poolManager.getTimeoutInMillis()));
_source.selectPoolAndStartMover(new TransferRetryPolicy(1, 0, _poolManager.getTimeoutInMillis()));

if (!_source.waitForMover(timeout)) {
throw new TimeoutCacheException("copy: wait for DoorTransferFinishedMessage expired");
Expand Down
Expand Up @@ -52,9 +52,9 @@ public AsynchronousRedirectedTransfer(Executor executor, PnfsHandler pnfs, Subje
}

@Override
public ListenableFuture<Void> selectPoolAndStartMoverAsync(String queue, TransferRetryPolicy policy)
public ListenableFuture<Void> selectPoolAndStartMoverAsync(TransferRetryPolicy policy)
{
return monitor.setQueueFuture(super.selectPoolAndStartMoverAsync(queue, policy));
return monitor.setQueueFuture(super.selectPoolAndStartMoverAsync(policy));
}

/**
Expand Down

0 comments on commit d81194b

Please sign in to comment.