diff --git a/modules/dcache-nfs/src/main/java/org/dcache/chimera/nfsv41/mover/NfsMover.java b/modules/dcache-nfs/src/main/java/org/dcache/chimera/nfsv41/mover/NfsMover.java index 095e91f00d6..6cdd56af9ea 100644 --- a/modules/dcache-nfs/src/main/java/org/dcache/chimera/nfsv41/mover/NfsMover.java +++ b/modules/dcache-nfs/src/main/java/org/dcache/chimera/nfsv41/mover/NfsMover.java @@ -126,11 +126,14 @@ void disable(Throwable error) { * * @param session to attach to */ - synchronized void attachSession(NFSv41Session session) { + synchronized boolean attachSession(NFSv41Session session) { + if (_session == null) { _session = session; _session.getClient().attachState(_state); + return true; } + return false; } /** diff --git a/modules/dcache-nfs/src/main/java/org/dcache/chimera/nfsv41/mover/NfsTransferService.java b/modules/dcache-nfs/src/main/java/org/dcache/chimera/nfsv41/mover/NfsTransferService.java index 57ebb9c496b..ab9f61c5682 100644 --- a/modules/dcache-nfs/src/main/java/org/dcache/chimera/nfsv41/mover/NfsTransferService.java +++ b/modules/dcache-nfs/src/main/java/org/dcache/chimera/nfsv41/mover/NfsTransferService.java @@ -387,6 +387,9 @@ public boolean remove(NfsMover mover) { @Nullable NfsMover getMoverByStateId(CompoundContext context, stateid4 stateid) { NfsMover mover = _activeIO.get(stateid); if (mover != null) { + if (mover.attachSession(context.getSession())) { + mover.setLocalEndpoint(context.getRemoteSocketAddress()); + } mover.attachSession(context.getSession()); } return mover; diff --git a/modules/dcache-vehicles/src/main/java/diskCacheV111/vehicles/MoverInfoMessage.java b/modules/dcache-vehicles/src/main/java/diskCacheV111/vehicles/MoverInfoMessage.java index 7fa1ef76420..8dd74776071 100644 --- a/modules/dcache-vehicles/src/main/java/diskCacheV111/vehicles/MoverInfoMessage.java +++ b/modules/dcache-vehicles/src/main/java/diskCacheV111/vehicles/MoverInfoMessage.java @@ -2,6 +2,8 @@ import diskCacheV111.util.PnfsId; import dmg.cells.nucleus.CellAddressCore; + +import java.net.InetSocketAddress; import java.time.Duration; import java.util.Optional; @@ -24,6 +26,8 @@ public class MoverInfoMessage extends PnfsFileInfoMessage { private static final long serialVersionUID = -7013160118909496211L; private String _transferPath; + private InetSocketAddress _localEndpoint; + public MoverInfoMessage(CellAddressCore address, PnfsId pnfsId) { super("transfer", "pool", address, pnfsId); } @@ -129,6 +133,13 @@ public Optional getWriteActive() { return Optional.ofNullable(_writeActive); } + public void setLocalEndpoint(InetSocketAddress endpoint) { + _localEndpoint = endpoint; + } + + public Optional getLocalEndpoint() { + return Optional.ofNullable(_localEndpoint); + } @Override public String toString() { return "MoverInfoMessage{" + @@ -145,6 +156,7 @@ public String toString() { ", readActive='" + _readActive + '\'' + ", writeIdle='" + _writeIdle + '\'' + ", writeActive='" + _writeActive + '\'' + + ", localEndporint='" + _localEndpoint + '\'' + "} " + super.toString(); } diff --git a/modules/dcache-xrootd/src/main/java/org/dcache/xrootd/pool/XrootdTransferService.java b/modules/dcache-xrootd/src/main/java/org/dcache/xrootd/pool/XrootdTransferService.java index 5ebd317d58e..59844cb7aa2 100644 --- a/modules/dcache-xrootd/src/main/java/org/dcache/xrootd/pool/XrootdTransferService.java +++ b/modules/dcache-xrootd/src/main/java/org/dcache/xrootd/pool/XrootdTransferService.java @@ -1,6 +1,6 @@ /* dCache - http://www.dcache.org/ * - * Copyright (C) 2013-2015 Deutsches Elektronen-Synchrotron + * Copyright (C) 2013-2023 Deutsches Elektronen-Synchrotron * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as @@ -41,7 +41,6 @@ import io.netty.channel.nio.NioEventLoopGroup; import io.netty.handler.logging.LoggingHandler; import io.netty.handler.timeout.IdleStateHandler; -import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketException; import java.util.EnumSet; @@ -64,7 +63,6 @@ import org.dcache.pool.movers.NettyMover; import org.dcache.pool.movers.NettyTransferService; import org.dcache.util.CDCThreadFactory; -import org.dcache.util.NetworkUtils; import org.dcache.vehicles.XrootdDoorAdressInfoMessage; import org.dcache.vehicles.XrootdProtocolInfo; import org.dcache.xrootd.OutboundExceptionHandler; @@ -386,17 +384,15 @@ protected UUID createUuid(XrootdProtocolInfo protocolInfo) { * Sends our address to the door. Copied from the old xrootd mover. */ @Override - protected void sendAddressToDoor(NettyMover mover, int port) + protected void sendAddressToDoor(NettyMover mover, InetSocketAddress localEndpoint) throws SocketException, CacheException { XrootdProtocolInfo protocolInfo = mover.getProtocolInfo(); - InetAddress localIP = NetworkUtils.getLocalAddress( - protocolInfo.getSocketAddress().getAddress()); CellPath cellpath = protocolInfo.getXrootdDoorCellPath(); + XrootdDoorAdressInfoMessage doorMsg = - new XrootdDoorAdressInfoMessage(protocolInfo.getXrootdFileHandle(), - new InetSocketAddress(localIP, port)); + new XrootdDoorAdressInfoMessage(protocolInfo.getXrootdFileHandle(), localEndpoint); doorStub.notify(cellpath, doorMsg); - LOGGER.debug("sending redirect {} to Xrootd-door {}", localIP, cellpath); + LOGGER.debug("sending redirect {} to Xrootd-door {}", localEndpoint, cellpath); } @Override diff --git a/modules/dcache/src/main/java/org/dcache/http/HttpTransferService.java b/modules/dcache/src/main/java/org/dcache/http/HttpTransferService.java index 6d4a8519670..c4a22518019 100644 --- a/modules/dcache/src/main/java/org/dcache/http/HttpTransferService.java +++ b/modules/dcache/src/main/java/org/dcache/http/HttpTransferService.java @@ -39,14 +39,14 @@ import io.netty.handler.logging.LoggingHandler; import io.netty.handler.stream.ChunkedWriteHandler; import io.netty.handler.timeout.IdleStateHandler; -import java.net.InetAddress; + +import java.net.InetSocketAddress; import java.net.SocketException; import java.net.URI; import java.net.URISyntaxException; import java.util.UUID; import org.dcache.pool.movers.NettyMover; import org.dcache.pool.movers.NettyTransferService; -import org.dcache.util.NetworkUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Required; @@ -110,12 +110,12 @@ protected UUID createUuid(HttpProtocolInfo protocolInfo) { * Send the network address of this mover to the door, along with the UUID identifying it. */ @Override - protected void sendAddressToDoor(NettyMover mover, int port) + protected void sendAddressToDoor(NettyMover mover, InetSocketAddress localEndppoint) throws SocketException, CacheException { HttpProtocolInfo protocolInfo = mover.getProtocolInfo(); String uri; try { - uri = getUri(protocolInfo, port, mover.getUuid()).toASCIIString(); + uri = getUri(protocolInfo, localEndppoint, mover.getUuid()).toASCIIString(); } catch (URISyntaxException e) { throw new RuntimeException( "Failed to create URI for HTTP mover. Please report to support@dcache.org", e); @@ -130,18 +130,16 @@ protected void sendAddressToDoor(NettyMover mover, int port) doorStub.notify(new CellPath(httpDoor), httpDoorMessage); } - protected URI getUri(HttpProtocolInfo protocolInfo, int port, UUID uuid) + protected URI getUri(HttpProtocolInfo protocolInfo, InetSocketAddress localEndpoint, UUID uuid) throws SocketException, CacheException, URISyntaxException { String path = protocolInfo.getPath(); if (!path.startsWith("/")) { path = "/" + path; } - InetAddress localIP = - NetworkUtils.getLocalAddress(protocolInfo.getSocketAddress().getAddress()); return new URI(PROTOCOL_HTTP, null, - localIP.getHostAddress(), - port, + localEndpoint.getAddress().getHostAddress(), + localEndpoint.getPort(), path, UUID_QUERY_PARAM + QUERY_PARAM_ASSIGN + uuid.toString(), null); diff --git a/modules/dcache/src/main/java/org/dcache/http/HttpsTransferService.java b/modules/dcache/src/main/java/org/dcache/http/HttpsTransferService.java index dbb8e7ad31d..a9a6b311575 100644 --- a/modules/dcache/src/main/java/org/dcache/http/HttpsTransferService.java +++ b/modules/dcache/src/main/java/org/dcache/http/HttpsTransferService.java @@ -25,7 +25,9 @@ import io.netty.handler.codec.http.cors.CorsConfigBuilder; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslHandler; + import java.net.InetAddress; +import java.net.InetSocketAddress; import java.net.SocketException; import java.net.URI; import java.net.URISyntaxException; @@ -70,10 +72,10 @@ CorsConfigBuilder corsConfigBuilder() { } @Override - protected URI getUri(HttpProtocolInfo protocolInfo, int port, UUID uuid) + protected URI getUri(HttpProtocolInfo protocolInfo, InetSocketAddress localEndpoint, UUID uuid) throws SocketException, CacheException, URISyntaxException { - URI plainUrl = super.getUri(protocolInfo, port, uuid); + URI plainUrl = super.getUri(protocolInfo, localEndpoint, uuid); String host = getHost(plainUrl); try { if (InetAddresses.isInetAddress(host)) { diff --git a/modules/dcache/src/main/java/org/dcache/notification/BillingMessageSerializer.java b/modules/dcache/src/main/java/org/dcache/notification/BillingMessageSerializer.java index b989a8017f0..efcffa6fa58 100644 --- a/modules/dcache/src/main/java/org/dcache/notification/BillingMessageSerializer.java +++ b/modules/dcache/src/main/java/org/dcache/notification/BillingMessageSerializer.java @@ -2,6 +2,7 @@ import static java.nio.charset.StandardCharsets.UTF_8; +import com.google.common.net.InetAddresses; import diskCacheV111.vehicles.IpProtocolInfo; import diskCacheV111.vehicles.MoverInfoMessage; import java.net.InetSocketAddress; @@ -40,6 +41,7 @@ static JSONObject transform(MoverInfoMessage data) { data.getWriteIdle().ifPresent(d -> o.put("writeIdle", d.toString())); data.getReadActive().ifPresent(d -> o.put("readActive", d.toString())); data.getWriteActive().ifPresent(d -> o.put("writeActive", d.toString())); + data.getLocalEndpoint().ifPresent(d -> o.put("localEndpoint", InetAddresses.toUriString(d.getAddress()) + ":" + d.getPort())); JSONObject status = new JSONObject(); status.put("code", data.getResultCode()); diff --git a/modules/dcache/src/main/java/org/dcache/pool/classic/DefaultPostTransferService.java b/modules/dcache/src/main/java/org/dcache/pool/classic/DefaultPostTransferService.java index 666c2a48636..b733335e27a 100644 --- a/modules/dcache/src/main/java/org/dcache/pool/classic/DefaultPostTransferService.java +++ b/modules/dcache/src/main/java/org/dcache/pool/classic/DefaultPostTransferService.java @@ -1,6 +1,6 @@ /* dCache - http://www.dcache.org/ * - * Copyright (C) 2013 - 2020 Deutsches Elektronen-Synchrotron + * Copyright (C) 2013 - 2023 Deutsches Elektronen-Synchrotron * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as @@ -181,6 +181,7 @@ public MoverInfoMessage generateBillingMessage(Mover mover, long fileSize) { mover.getProtocolInfo()); info.setBillingPath(mover.getBillingPath()); info.setTransferPath(mover.getTransferPath()); + mover.getLocalEndpoint().ifPresent(info::setLocalEndpoint); MoverInfoMessage infoWithStats = mover.getChannel() .flatMap(c -> c.optionallyAs(IoStatisticsChannel.class)) diff --git a/modules/dcache/src/main/java/org/dcache/pool/movers/AbstractMover.java b/modules/dcache/src/main/java/org/dcache/pool/movers/AbstractMover.java index a34a324a20a..f462f440793 100644 --- a/modules/dcache/src/main/java/org/dcache/pool/movers/AbstractMover.java +++ b/modules/dcache/src/main/java/org/dcache/pool/movers/AbstractMover.java @@ -1,6 +1,6 @@ /* dCache - http://www.dcache.org/ * - * Copyright (C) 2013 - 2019 Deutsches Elektronen-Synchrotron + * Copyright (C) 2013 - 2023 Deutsches Elektronen-Synchrotron * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as @@ -31,6 +31,7 @@ import dmg.cells.nucleus.CellPath; import java.io.IOException; import java.io.InterruptedIOException; +import java.net.InetSocketAddress; import java.nio.channels.AsynchronousCloseException; import java.nio.channels.CompletionHandler; import java.nio.file.OpenOption; @@ -82,6 +83,8 @@ public abstract class AbstractMover

_channel = Optional.empty(); + private volatile InetSocketAddress _localEndpoint; + public AbstractMover(ReplicaDescriptor handle, PoolIoFileMessage message, CellPath pathToDoor, TransferService transferService) { TypeToken type = new TypeToken(getClass()) { @@ -354,4 +357,13 @@ public String toString() { } protected abstract String getStatus(); + + public void setLocalEndpoint(InetSocketAddress addr) { + _localEndpoint = addr; + } + + @Override + public Optional getLocalEndpoint() { + return Optional.ofNullable(_localEndpoint); + } } diff --git a/modules/dcache/src/main/java/org/dcache/pool/movers/Mover.java b/modules/dcache/src/main/java/org/dcache/pool/movers/Mover.java index 946595c3da3..8f5fb386f63 100644 --- a/modules/dcache/src/main/java/org/dcache/pool/movers/Mover.java +++ b/modules/dcache/src/main/java/org/dcache/pool/movers/Mover.java @@ -1,6 +1,6 @@ /* dCache - http://www.dcache.org/ * - * Copyright (C) 2013 - 2017 Deutsches Elektronen-Synchrotron + * Copyright (C) 2013 - 2023 Deutsches Elektronen-Synchrotron * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as @@ -198,4 +198,10 @@ default List remoteConnections() { default Long getBytesExpected() { return null; } + + /** + * Returns the {@link Optional} containing {@link InetSocketAddress} of the local endpoint used by clients + * to access the mover. + */ + Optional getLocalEndpoint(); } diff --git a/modules/dcache/src/main/java/org/dcache/pool/movers/NettyTransferService.java b/modules/dcache/src/main/java/org/dcache/pool/movers/NettyTransferService.java index 4327759b658..b7873b92fe6 100644 --- a/modules/dcache/src/main/java/org/dcache/pool/movers/NettyTransferService.java +++ b/modules/dcache/src/main/java/org/dcache/pool/movers/NettyTransferService.java @@ -1,6 +1,6 @@ /* dCache - http://www.dcache.org/ * - * Copyright (C) 2013 - 2019 Deutsches Elektronen-Synchrotron + * Copyright (C) 2013 - 2023 Deutsches Elektronen-Synchrotron * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as @@ -25,6 +25,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import diskCacheV111.util.CacheException; import diskCacheV111.util.TimeoutCacheException; +import diskCacheV111.vehicles.IpProtocolInfo; import diskCacheV111.vehicles.PoolIoFileMessage; import diskCacheV111.vehicles.ProtocolInfo; import dmg.cells.nucleus.CDC; @@ -44,6 +45,7 @@ import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.util.concurrent.GlobalEventExecutor; import java.io.IOException; +import java.net.InetAddress; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.CompletionHandler; @@ -68,6 +70,7 @@ import org.dcache.util.ChecksumType; import org.dcache.util.FireAndForgetTask; import org.dcache.util.NettyPortRange; +import org.dcache.util.NetworkUtils; import org.dcache.util.TryCatchTemplate; import org.dcache.vehicles.FileAttributes; import org.slf4j.Logger; @@ -423,7 +426,12 @@ public void execute() } conditionallyStartServer(); setCancellable(channel); - sendAddressToDoor(mover, getServerAddress().getPort()); + InetAddress localIP = + NetworkUtils.getLocalAddress(((IpProtocolInfo)mover.getProtocolInfo()).getSocketAddress().getAddress()); + + InetSocketAddress localEndpoint = new InetSocketAddress(localIP, getServerAddress().getPort()); + mover.setLocalEndpoint(localEndpoint); + sendAddressToDoor(mover, localEndpoint); } @Override @@ -674,7 +682,7 @@ protected void executeMoverClose(NettyMover

mover, postTransferService.execute(mover, completionHandler); } - protected abstract void sendAddressToDoor(NettyMover

mover, int port) + protected abstract void sendAddressToDoor(NettyMover

mover, InetSocketAddress localEndpoint) throws Exception; protected abstract UUID createUuid(P protocolInfo);