Skip to content

Commit

Permalink
pool: include local endpoint information into billing info
Browse files Browse the repository at this point in the history
Motivation:
some monitoring infrastructures, line WLCG ops or ESnet packet tracing
require to track network connection and in addition to client's IP
pool's local IP address as well.

Modification:
Update Mover class to optionally provide pool's endpoint, if client has
connected. Update netty and nfs movers to populate movers with the info.
Update kafka serializers to populate kafka messages with desired info.

Result:

```
    "cellDomain": "dCacheDomain",
    "isP2p": false,
    "transferTime": 12669,
    "version": "1.0",
    "transferSize": 2037372928,
    "localEndpoint": "127.0.0.1:20188",
    "protocolInfo": {
      "protocol": "Http",
      "port": 0,
      "host": "127.0.0.1",
      "versionMajor": 1,
      "versionMinor": 1
    },
```

Acked-by: Albert Rossi
Target: master
Require-book: no
Require-notes: yes
  • Loading branch information
kofemann committed May 12, 2023
1 parent 8819d13 commit 050f680
Show file tree
Hide file tree
Showing 11 changed files with 70 additions and 27 deletions.
Expand Up @@ -126,11 +126,14 @@ void disable(Throwable error) {
*
* @param session to attach to
*/
synchronized void attachSession(NFSv41Session session) {
synchronized boolean attachSession(NFSv41Session session) {

if (_session == null) {
_session = session;
_session.getClient().attachState(_state);
return true;
}
return false;
}

/**
Expand Down
Expand Up @@ -387,6 +387,9 @@ public boolean remove(NfsMover mover) {
@Nullable NfsMover getMoverByStateId(CompoundContext context, stateid4 stateid) {
NfsMover mover = _activeIO.get(stateid);
if (mover != null) {
if (mover.attachSession(context.getSession())) {
mover.setLocalEndpoint(context.getRemoteSocketAddress());
}
mover.attachSession(context.getSession());
}
return mover;
Expand Down
Expand Up @@ -2,6 +2,8 @@

import diskCacheV111.util.PnfsId;
import dmg.cells.nucleus.CellAddressCore;

import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.Optional;

Expand All @@ -24,6 +26,8 @@ public class MoverInfoMessage extends PnfsFileInfoMessage {
private static final long serialVersionUID = -7013160118909496211L;
private String _transferPath;

private InetSocketAddress _localEndpoint;

public MoverInfoMessage(CellAddressCore address, PnfsId pnfsId) {
super("transfer", "pool", address, pnfsId);
}
Expand Down Expand Up @@ -129,6 +133,13 @@ public Optional<Duration> getWriteActive() {
return Optional.ofNullable(_writeActive);
}

public void setLocalEndpoint(InetSocketAddress endpoint) {
_localEndpoint = endpoint;
}

public Optional<InetSocketAddress> getLocalEndpoint() {
return Optional.ofNullable(_localEndpoint);
}
@Override
public String toString() {
return "MoverInfoMessage{" +
Expand All @@ -145,6 +156,7 @@ public String toString() {
", readActive='" + _readActive + '\'' +
", writeIdle='" + _writeIdle + '\'' +
", writeActive='" + _writeActive + '\'' +
", localEndporint='" + _localEndpoint + '\'' +
"} " + super.toString();
}

Expand Down
@@ -1,6 +1,6 @@
/* dCache - http://www.dcache.org/
*
* Copyright (C) 2013-2015 Deutsches Elektronen-Synchrotron
* Copyright (C) 2013-2023 Deutsches Elektronen-Synchrotron
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
Expand Down Expand Up @@ -41,7 +41,6 @@
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.IdleStateHandler;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketException;
import java.util.EnumSet;
Expand All @@ -64,7 +63,6 @@
import org.dcache.pool.movers.NettyMover;
import org.dcache.pool.movers.NettyTransferService;
import org.dcache.util.CDCThreadFactory;
import org.dcache.util.NetworkUtils;
import org.dcache.vehicles.XrootdDoorAdressInfoMessage;
import org.dcache.vehicles.XrootdProtocolInfo;
import org.dcache.xrootd.OutboundExceptionHandler;
Expand Down Expand Up @@ -386,17 +384,15 @@ protected UUID createUuid(XrootdProtocolInfo protocolInfo) {
* Sends our address to the door. Copied from the old xrootd mover.
*/
@Override
protected void sendAddressToDoor(NettyMover<XrootdProtocolInfo> mover, int port)
protected void sendAddressToDoor(NettyMover<XrootdProtocolInfo> mover, InetSocketAddress localEndpoint)
throws SocketException, CacheException {
XrootdProtocolInfo protocolInfo = mover.getProtocolInfo();
InetAddress localIP = NetworkUtils.getLocalAddress(
protocolInfo.getSocketAddress().getAddress());
CellPath cellpath = protocolInfo.getXrootdDoorCellPath();

XrootdDoorAdressInfoMessage doorMsg =
new XrootdDoorAdressInfoMessage(protocolInfo.getXrootdFileHandle(),
new InetSocketAddress(localIP, port));
new XrootdDoorAdressInfoMessage(protocolInfo.getXrootdFileHandle(), localEndpoint);
doorStub.notify(cellpath, doorMsg);
LOGGER.debug("sending redirect {} to Xrootd-door {}", localIP, cellpath);
LOGGER.debug("sending redirect {} to Xrootd-door {}", localEndpoint, cellpath);
}

@Override
Expand Down
Expand Up @@ -39,14 +39,14 @@
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler;
import java.net.InetAddress;

import java.net.InetSocketAddress;
import java.net.SocketException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.UUID;
import org.dcache.pool.movers.NettyMover;
import org.dcache.pool.movers.NettyTransferService;
import org.dcache.util.NetworkUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Required;
Expand Down Expand Up @@ -110,12 +110,12 @@ protected UUID createUuid(HttpProtocolInfo protocolInfo) {
* Send the network address of this mover to the door, along with the UUID identifying it.
*/
@Override
protected void sendAddressToDoor(NettyMover<HttpProtocolInfo> mover, int port)
protected void sendAddressToDoor(NettyMover<HttpProtocolInfo> mover, InetSocketAddress localEndppoint)
throws SocketException, CacheException {
HttpProtocolInfo protocolInfo = mover.getProtocolInfo();
String uri;
try {
uri = getUri(protocolInfo, port, mover.getUuid()).toASCIIString();
uri = getUri(protocolInfo, localEndppoint, mover.getUuid()).toASCIIString();
} catch (URISyntaxException e) {
throw new RuntimeException(
"Failed to create URI for HTTP mover. Please report to support@dcache.org", e);
Expand All @@ -130,18 +130,16 @@ protected void sendAddressToDoor(NettyMover<HttpProtocolInfo> mover, int port)
doorStub.notify(new CellPath(httpDoor), httpDoorMessage);
}

protected URI getUri(HttpProtocolInfo protocolInfo, int port, UUID uuid)
protected URI getUri(HttpProtocolInfo protocolInfo, InetSocketAddress localEndpoint, UUID uuid)
throws SocketException, CacheException, URISyntaxException {
String path = protocolInfo.getPath();
if (!path.startsWith("/")) {
path = "/" + path;
}
InetAddress localIP =
NetworkUtils.getLocalAddress(protocolInfo.getSocketAddress().getAddress());
return new URI(PROTOCOL_HTTP,
null,
localIP.getHostAddress(),
port,
localEndpoint.getAddress().getHostAddress(),
localEndpoint.getPort(),
path,
UUID_QUERY_PARAM + QUERY_PARAM_ASSIGN + uuid.toString(),
null);
Expand Down
Expand Up @@ -25,7 +25,9 @@
import io.netty.handler.codec.http.cors.CorsConfigBuilder;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslHandler;

import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketException;
import java.net.URI;
import java.net.URISyntaxException;
Expand Down Expand Up @@ -70,10 +72,10 @@ CorsConfigBuilder corsConfigBuilder() {
}

@Override
protected URI getUri(HttpProtocolInfo protocolInfo, int port, UUID uuid)
protected URI getUri(HttpProtocolInfo protocolInfo, InetSocketAddress localEndpoint, UUID uuid)
throws SocketException, CacheException, URISyntaxException {

URI plainUrl = super.getUri(protocolInfo, port, uuid);
URI plainUrl = super.getUri(protocolInfo, localEndpoint, uuid);
String host = getHost(plainUrl);
try {
if (InetAddresses.isInetAddress(host)) {
Expand Down
Expand Up @@ -2,6 +2,7 @@

import static java.nio.charset.StandardCharsets.UTF_8;

import com.google.common.net.InetAddresses;
import diskCacheV111.vehicles.IpProtocolInfo;
import diskCacheV111.vehicles.MoverInfoMessage;
import java.net.InetSocketAddress;
Expand Down Expand Up @@ -40,6 +41,7 @@ static JSONObject transform(MoverInfoMessage data) {
data.getWriteIdle().ifPresent(d -> o.put("writeIdle", d.toString()));
data.getReadActive().ifPresent(d -> o.put("readActive", d.toString()));
data.getWriteActive().ifPresent(d -> o.put("writeActive", d.toString()));
data.getLocalEndpoint().ifPresent(d -> o.put("localEndpoint", InetAddresses.toUriString(d.getAddress()) + ":" + d.getPort()));

JSONObject status = new JSONObject();
status.put("code", data.getResultCode());
Expand Down
@@ -1,6 +1,6 @@
/* dCache - http://www.dcache.org/
*
* Copyright (C) 2013 - 2020 Deutsches Elektronen-Synchrotron
* Copyright (C) 2013 - 2023 Deutsches Elektronen-Synchrotron
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
Expand Down Expand Up @@ -181,6 +181,7 @@ public MoverInfoMessage generateBillingMessage(Mover<?> mover, long fileSize) {
mover.getProtocolInfo());
info.setBillingPath(mover.getBillingPath());
info.setTransferPath(mover.getTransferPath());
mover.getLocalEndpoint().ifPresent(info::setLocalEndpoint);

MoverInfoMessage infoWithStats = mover.getChannel()
.flatMap(c -> c.optionallyAs(IoStatisticsChannel.class))
Expand Down
@@ -1,6 +1,6 @@
/* dCache - http://www.dcache.org/
*
* Copyright (C) 2013 - 2019 Deutsches Elektronen-Synchrotron
* Copyright (C) 2013 - 2023 Deutsches Elektronen-Synchrotron
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
Expand Down Expand Up @@ -31,6 +31,7 @@
import dmg.cells.nucleus.CellPath;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.InetSocketAddress;
import java.nio.channels.AsynchronousCloseException;
import java.nio.channels.CompletionHandler;
import java.nio.file.OpenOption;
Expand Down Expand Up @@ -82,6 +83,8 @@ public abstract class AbstractMover<P extends ProtocolInfo, M extends AbstractMo
private volatile ChecksumChannel _checksumChannel;
private volatile Optional<RepositoryChannel> _channel = Optional.empty();

private volatile InetSocketAddress _localEndpoint;

public AbstractMover(ReplicaDescriptor handle, PoolIoFileMessage message, CellPath pathToDoor,
TransferService<M> transferService) {
TypeToken<M> type = new TypeToken<M>(getClass()) {
Expand Down Expand Up @@ -354,4 +357,13 @@ public String toString() {
}

protected abstract String getStatus();

public void setLocalEndpoint(InetSocketAddress addr) {
_localEndpoint = addr;
}

@Override
public Optional<InetSocketAddress> getLocalEndpoint() {
return Optional.ofNullable(_localEndpoint);
}
}
@@ -1,6 +1,6 @@
/* dCache - http://www.dcache.org/
*
* Copyright (C) 2013 - 2017 Deutsches Elektronen-Synchrotron
* Copyright (C) 2013 - 2023 Deutsches Elektronen-Synchrotron
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
Expand Down Expand Up @@ -198,4 +198,10 @@ default List<InetSocketAddress> remoteConnections() {
default Long getBytesExpected() {
return null;
}

/**
* Returns the {@link Optional} containing {@link InetSocketAddress} of the local endpoint used by clients
* to access the mover.
*/
Optional<InetSocketAddress> getLocalEndpoint();
}
@@ -1,6 +1,6 @@
/* dCache - http://www.dcache.org/
*
* Copyright (C) 2013 - 2019 Deutsches Elektronen-Synchrotron
* Copyright (C) 2013 - 2023 Deutsches Elektronen-Synchrotron
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
Expand All @@ -25,6 +25,7 @@
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import diskCacheV111.util.CacheException;
import diskCacheV111.util.TimeoutCacheException;
import diskCacheV111.vehicles.IpProtocolInfo;
import diskCacheV111.vehicles.PoolIoFileMessage;
import diskCacheV111.vehicles.ProtocolInfo;
import dmg.cells.nucleus.CDC;
Expand All @@ -44,6 +45,7 @@
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.CompletionHandler;
Expand All @@ -68,6 +70,7 @@
import org.dcache.util.ChecksumType;
import org.dcache.util.FireAndForgetTask;
import org.dcache.util.NettyPortRange;
import org.dcache.util.NetworkUtils;
import org.dcache.util.TryCatchTemplate;
import org.dcache.vehicles.FileAttributes;
import org.slf4j.Logger;
Expand Down Expand Up @@ -423,7 +426,12 @@ public void execute()
}
conditionallyStartServer();
setCancellable(channel);
sendAddressToDoor(mover, getServerAddress().getPort());
InetAddress localIP =
NetworkUtils.getLocalAddress(((IpProtocolInfo)mover.getProtocolInfo()).getSocketAddress().getAddress());

InetSocketAddress localEndpoint = new InetSocketAddress(localIP, getServerAddress().getPort());
mover.setLocalEndpoint(localEndpoint);
sendAddressToDoor(mover, localEndpoint);
}

@Override
Expand Down Expand Up @@ -674,7 +682,7 @@ protected void executeMoverClose(NettyMover<P> mover,
postTransferService.execute(mover, completionHandler);
}

protected abstract void sendAddressToDoor(NettyMover<P> mover, int port)
protected abstract void sendAddressToDoor(NettyMover<P> mover, InetSocketAddress localEndpoint)
throws Exception;

protected abstract UUID createUuid(P protocolInfo);
Expand Down

0 comments on commit 050f680

Please sign in to comment.