From 5cf1c757c954b4dec0b4e568024d67e241b71853 Mon Sep 17 00:00:00 2001 From: Tigran Mkrtchyan Date: Fri, 19 Apr 2024 14:00:49 +0200 Subject: [PATCH] webdav: use transfermanager+id to identify TPC transfer Motivation: The WebDAV door keeps track of TPC transfers based on transfer IDs generated by RTM. As RTM generates transfer ID based on the current timestamp, in deployments were multiple RTM are running, for two transfers that have started at the same point in time (with millisecond precision), then one of them will be lost. As soon as a first one completes, the second transfer becomes orphan: ``` Perf Marker Timestamp: 1713527368 State: Running State description: Mover created Stripe Index: 0 Stripe Start Time: 1713527348 Stripe Last Transferred: 1713527348 Stripe Transfer Time: 19 Stripe Bytes Transferred: 0 Stripe Status: RUNNING Total Stripe Count: 1 RemoteConnections: tcp:127.0.0.1:9000 End Perf Marker Timestamp: 1713527373 State: Unknown transfer State description: Unknown transfer Stripe Index: 0 Total Stripe Count: 1 End ``` Modification: Update RemoteTransferHandler to use `transfermanager+id` as transfer identity to avoid this ambiguity. Result: no transfer id collisions Fixes: #7548 Acked-by: Marina Sahakyan Acked-by: Svenja Meyer Acked-by: Dmitry Lirvintsev Target: master, 10.0, 9.2 Require-book: no Require-notes: yes (cherry picked from commit 412bfe2a33a3c58584b5447c47940b0b396511dd) Signed-off-by: Tigran Mkrtchyan --- .../transfer/RemoteTransferHandler.java | 33 +++++++++++++++---- 1 file changed, 27 insertions(+), 6 deletions(-) diff --git a/modules/dcache-webdav/src/main/java/org/dcache/webdav/transfer/RemoteTransferHandler.java b/modules/dcache-webdav/src/main/java/org/dcache/webdav/transfer/RemoteTransferHandler.java index c3e562b66d1..fa5b1765433 100644 --- a/modules/dcache-webdav/src/main/java/org/dcache/webdav/transfer/RemoteTransferHandler.java +++ b/modules/dcache-webdav/src/main/java/org/dcache/webdav/transfer/RemoteTransferHandler.java @@ -1,6 +1,6 @@ /* dCache - http://www.dcache.org/ * - * Copyright (C) 2014-2022 Deutsches Elektronen-Synchrotron + * Copyright (C) 2014-2024 Deutsches Elektronen-Synchrotron * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as @@ -59,6 +59,7 @@ import diskCacheV111.vehicles.IoDoorEntry; import diskCacheV111.vehicles.IoJobInfo; import diskCacheV111.vehicles.IpProtocolInfo; +import diskCacheV111.vehicles.Message; import diskCacheV111.vehicles.PnfsCreateEntryMessage; import diskCacheV111.vehicles.RemoteHttpDataTransferProtocolInfo; import diskCacheV111.vehicles.RemoteHttpsDataTransferProtocolInfo; @@ -268,7 +269,7 @@ public enum TransferFlag { private static final Duration CELL_MESSAGE_LATENCY = Duration.of(2, MINUTES); private final Queue _messageQueueTime = EvictingQueue.create(MESSAGE_QUEUE_HISTORY); - private final Map _transfers = new ConcurrentHashMap<>(); + private final Map _transfers = new ConcurrentHashMap<>(); private long _performanceMarkerPeriod; private CellStub _genericTransferManager; @@ -683,9 +684,29 @@ public ListenableFuture> acceptRequest( return transfer.start(); } + /** + * Create a transfer id based on envelope source and message id. + * @param envelope message envelope. + * @param message message. + * @return transfer id. + */ + private static String getTransferId(CellMessage envelope, Message message) { + return envelope.getSourceAddress() + "-" +message.getId(); + } + + /** + * Create a transfer id based on cell stub destination and the specified id. + * @param cellStub cellStub to use. + * @param id message id. + * @return transfer id. + */ + private static String getTransferId(CellStub cellStub, long id) { + return cellStub.getDestinationPath().toAddressString() + "-" + id; + } + public void messageArrived(CellMessage envelope, TransferCompleteMessage message) { messageArrived(Duration.of(envelope.getLocalAge(), MILLIS)); - RemoteTransfer transfer = _transfers.get(message.getId()); + RemoteTransfer transfer = _transfers.get(getTransferId(envelope, message)); if (transfer != null) { _activity.execute(() -> transfer.completed(null)); } @@ -693,7 +714,7 @@ public void messageArrived(CellMessage envelope, TransferCompleteMessage message public void messageArrived(CellMessage envelope, TransferFailedMessage message) { messageArrived(Duration.of(envelope.getLocalAge(), MILLIS)); - RemoteTransfer transfer = _transfers.get(message.getId()); + RemoteTransfer transfer = _transfers.get(getTransferId(envelope, message)); if (transfer != null) { String error = String.valueOf(message.getErrorObject()); _activity.execute(() -> transfer.completed(error)); @@ -924,7 +945,7 @@ public synchronized ListenableFuture> start() _transferManager = _genericTransferManager.withDestination(path); } _id = response.getId(); - _transfers.put(_id, this); + _transfers.put(getTransferId(_transferManager, _id), this); addDigestResponseHeader(attributes); } catch (NoRouteToCellException | TimeoutCacheException e) { LOGGER.error("Failed to send request to transfer manager: {}", e.getMessage()); @@ -1114,7 +1135,7 @@ private void addDigestResponseHeader(FileAttributes attributes) { } private void completed(String transferError) { - if (_transfers.remove(_id) == null) { + if (_transfers.remove(getTransferId(_transferManager, _id)) == null) { // Something else called complete, so do nothing. return; }