Skip to content

Commit

Permalink
Merge pull request #7556 from kofemann/issue-7548-9.2
Browse files Browse the repository at this point in the history
webdav: use transfermanager+id to identify TPC transfer
  • Loading branch information
svemeyer committed Apr 25, 2024
2 parents b0e6eaf + 5cf1c75 commit 3b0253d
Showing 1 changed file with 27 additions and 6 deletions.
@@ -1,6 +1,6 @@
/* dCache - http://www.dcache.org/
*
* Copyright (C) 2014-2022 Deutsches Elektronen-Synchrotron
* Copyright (C) 2014-2024 Deutsches Elektronen-Synchrotron
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
Expand Down Expand Up @@ -59,6 +59,7 @@
import diskCacheV111.vehicles.IoDoorEntry;
import diskCacheV111.vehicles.IoJobInfo;
import diskCacheV111.vehicles.IpProtocolInfo;
import diskCacheV111.vehicles.Message;
import diskCacheV111.vehicles.PnfsCreateEntryMessage;
import diskCacheV111.vehicles.RemoteHttpDataTransferProtocolInfo;
import diskCacheV111.vehicles.RemoteHttpsDataTransferProtocolInfo;
Expand Down Expand Up @@ -268,7 +269,7 @@ public enum TransferFlag {
private static final Duration CELL_MESSAGE_LATENCY = Duration.of(2, MINUTES);

private final Queue<Duration> _messageQueueTime = EvictingQueue.<Duration>create(MESSAGE_QUEUE_HISTORY);
private final Map<Long, RemoteTransfer> _transfers = new ConcurrentHashMap<>();
private final Map<String, RemoteTransfer> _transfers = new ConcurrentHashMap<>();

private long _performanceMarkerPeriod;
private CellStub _genericTransferManager;
Expand Down Expand Up @@ -683,17 +684,37 @@ public ListenableFuture<Optional<String>> acceptRequest(
return transfer.start();
}

/**
* Create a transfer id based on envelope source and message id.
* @param envelope message envelope.
* @param message message.
* @return transfer id.
*/
private static String getTransferId(CellMessage envelope, Message message) {
return envelope.getSourceAddress() + "-" +message.getId();
}

/**
* Create a transfer id based on cell stub destination and the specified id.
* @param cellStub cellStub to use.
* @param id message id.
* @return transfer id.
*/
private static String getTransferId(CellStub cellStub, long id) {
return cellStub.getDestinationPath().toAddressString() + "-" + id;
}

public void messageArrived(CellMessage envelope, TransferCompleteMessage message) {
messageArrived(Duration.of(envelope.getLocalAge(), MILLIS));
RemoteTransfer transfer = _transfers.get(message.getId());
RemoteTransfer transfer = _transfers.get(getTransferId(envelope, message));
if (transfer != null) {
_activity.execute(() -> transfer.completed(null));
}
}

public void messageArrived(CellMessage envelope, TransferFailedMessage message) {
messageArrived(Duration.of(envelope.getLocalAge(), MILLIS));
RemoteTransfer transfer = _transfers.get(message.getId());
RemoteTransfer transfer = _transfers.get(getTransferId(envelope, message));
if (transfer != null) {
String error = String.valueOf(message.getErrorObject());
_activity.execute(() -> transfer.completed(error));
Expand Down Expand Up @@ -924,7 +945,7 @@ public synchronized ListenableFuture<Optional<String>> start()
_transferManager = _genericTransferManager.withDestination(path);
}
_id = response.getId();
_transfers.put(_id, this);
_transfers.put(getTransferId(_transferManager, _id), this);
addDigestResponseHeader(attributes);
} catch (NoRouteToCellException | TimeoutCacheException e) {
LOGGER.error("Failed to send request to transfer manager: {}", e.getMessage());
Expand Down Expand Up @@ -1114,7 +1135,7 @@ private void addDigestResponseHeader(FileAttributes attributes) {
}

private void completed(String transferError) {
if (_transfers.remove(_id) == null) {
if (_transfers.remove(getTransferId(_transferManager, _id)) == null) {
// Something else called complete, so do nothing.
return;
}
Expand Down

0 comments on commit 3b0253d

Please sign in to comment.