Skip to content

Commit

Permalink
webdav: use transfermanager+id to identify TPC transfer
Browse files Browse the repository at this point in the history
Motivation:
The WebDAV door keeps track of TPC transfers based on transfer IDs
generated by RTM. As RTM generates transfer ID based on the current
timestamp, in deployments were multiple RTM are running, for two transfers
that have started at the same point in time (with millisecond precision),
then one of them will be lost. As soon as a first one completes, the second
transfer becomes orphan:

```
Perf Marker
    Timestamp: 1713527368
    State: Running
    State description: Mover created
    Stripe Index: 0
    Stripe Start Time: 1713527348
    Stripe Last Transferred: 1713527348
    Stripe Transfer Time: 19
    Stripe Bytes Transferred: 0
    Stripe Status: RUNNING
    Total Stripe Count: 1
    RemoteConnections: tcp:127.0.0.1:9000
End
Perf Marker
    Timestamp: 1713527373
    State: Unknown transfer
    State description: Unknown transfer
    Stripe Index: 0
    Total Stripe Count: 1
End
```

Modification:
Update RemoteTransferHandler to use `transfermanager+id` as transfer
identity to avoid this ambiguity.

Result:
no transfer id collisions

Fixes: #7548
Acked-by: Marina Sahakyan
Acked-by: Svenja Meyer
Acked-by: Dmitry Lirvintsev
Target: master, 10.0, 9.2
Require-book: no
Require-notes: yes
(cherry picked from commit 412bfe2)
Signed-off-by: Tigran Mkrtchyan <tigran.mkrtchyan@desy.de>
  • Loading branch information
kofemann committed Apr 22, 2024
1 parent 25fe810 commit 5cf1c75
Showing 1 changed file with 27 additions and 6 deletions.
@@ -1,6 +1,6 @@
/* dCache - http://www.dcache.org/
*
* Copyright (C) 2014-2022 Deutsches Elektronen-Synchrotron
* Copyright (C) 2014-2024 Deutsches Elektronen-Synchrotron
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
Expand Down Expand Up @@ -59,6 +59,7 @@
import diskCacheV111.vehicles.IoDoorEntry;
import diskCacheV111.vehicles.IoJobInfo;
import diskCacheV111.vehicles.IpProtocolInfo;
import diskCacheV111.vehicles.Message;
import diskCacheV111.vehicles.PnfsCreateEntryMessage;
import diskCacheV111.vehicles.RemoteHttpDataTransferProtocolInfo;
import diskCacheV111.vehicles.RemoteHttpsDataTransferProtocolInfo;
Expand Down Expand Up @@ -268,7 +269,7 @@ public enum TransferFlag {
private static final Duration CELL_MESSAGE_LATENCY = Duration.of(2, MINUTES);

private final Queue<Duration> _messageQueueTime = EvictingQueue.<Duration>create(MESSAGE_QUEUE_HISTORY);
private final Map<Long, RemoteTransfer> _transfers = new ConcurrentHashMap<>();
private final Map<String, RemoteTransfer> _transfers = new ConcurrentHashMap<>();

private long _performanceMarkerPeriod;
private CellStub _genericTransferManager;
Expand Down Expand Up @@ -683,17 +684,37 @@ public ListenableFuture<Optional<String>> acceptRequest(
return transfer.start();
}

/**
* Create a transfer id based on envelope source and message id.
* @param envelope message envelope.
* @param message message.
* @return transfer id.
*/
private static String getTransferId(CellMessage envelope, Message message) {
return envelope.getSourceAddress() + "-" +message.getId();
}

/**
* Create a transfer id based on cell stub destination and the specified id.
* @param cellStub cellStub to use.
* @param id message id.
* @return transfer id.
*/
private static String getTransferId(CellStub cellStub, long id) {
return cellStub.getDestinationPath().toAddressString() + "-" + id;
}

public void messageArrived(CellMessage envelope, TransferCompleteMessage message) {
messageArrived(Duration.of(envelope.getLocalAge(), MILLIS));
RemoteTransfer transfer = _transfers.get(message.getId());
RemoteTransfer transfer = _transfers.get(getTransferId(envelope, message));
if (transfer != null) {
_activity.execute(() -> transfer.completed(null));
}
}

public void messageArrived(CellMessage envelope, TransferFailedMessage message) {
messageArrived(Duration.of(envelope.getLocalAge(), MILLIS));
RemoteTransfer transfer = _transfers.get(message.getId());
RemoteTransfer transfer = _transfers.get(getTransferId(envelope, message));
if (transfer != null) {
String error = String.valueOf(message.getErrorObject());
_activity.execute(() -> transfer.completed(error));
Expand Down Expand Up @@ -924,7 +945,7 @@ public synchronized ListenableFuture<Optional<String>> start()
_transferManager = _genericTransferManager.withDestination(path);
}
_id = response.getId();
_transfers.put(_id, this);
_transfers.put(getTransferId(_transferManager, _id), this);
addDigestResponseHeader(attributes);
} catch (NoRouteToCellException | TimeoutCacheException e) {
LOGGER.error("Failed to send request to transfer manager: {}", e.getMessage());
Expand Down Expand Up @@ -1114,7 +1135,7 @@ private void addDigestResponseHeader(FileAttributes attributes) {
}

private void completed(String transferError) {
if (_transfers.remove(_id) == null) {
if (_transfers.remove(getTransferId(_transferManager, _id)) == null) {
// Something else called complete, so do nothing.
return;
}
Expand Down

0 comments on commit 5cf1c75

Please sign in to comment.