Skip to content

Commit

Permalink
pool: remote http movers to provide local endpoint info
Browse files Browse the repository at this point in the history
Motivation:
To track HTTP-TPC the local connection info should be included into
mover info.

Modification:
Update legacy mover interface to provide local endpoint. Update
RemoteHttpDataTransferProtocol to populate mover info, if possible.

Result:
HTTP-TPC should be include local endpoint info

Acked-by: Lea Morschel
Acked-by: Albert Rossi
Target: master
Require-book: no
Require-notes: yes
  • Loading branch information
kofemann committed Jul 13, 2023
1 parent fbdc448 commit 1ae82cc
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 1 deletion.
Expand Up @@ -4,6 +4,7 @@
import java.net.InetSocketAddress;
import java.nio.file.OpenOption;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import javax.annotation.Nullable;
import org.dcache.pool.repository.RepositoryChannel;
Expand Down Expand Up @@ -62,4 +63,12 @@ default Long getBytesExpected() {
default List<InetSocketAddress> remoteConnections() {
return null;
}

/**
* Returns the {@link Optional} containing {@link InetSocketAddress} of the local endpoint used by clients
* to access the mover.
*/
default Optional<InetSocketAddress> getLocalEndpoint() {
return Optional.empty();
}
}
@@ -1,6 +1,6 @@
/* dCache - http://www.dcache.org/
*
* Copyright (C) 2013 - 2019 Deutsches Elektronen-Synchrotron
* Copyright (C) 2013 - 2023 Deutsches Elektronen-Synchrotron
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
Expand All @@ -22,6 +22,8 @@
import dmg.cells.nucleus.CellPath;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Optional;

import org.dcache.pool.classic.TransferService;
import org.dcache.pool.repository.ReplicaDescriptor;

Expand Down Expand Up @@ -76,4 +78,9 @@ public List<InetSocketAddress> remoteConnections() {
public Long getBytesExpected() {
return _moverProtocol.getBytesExpected();
}

@Override
public Optional<InetSocketAddress> getLocalEndpoint() {
return _moverProtocol.getLocalEndpoint();
}
}
Expand Up @@ -212,6 +212,8 @@ private enum HeaderFlags {

private Long _expectedTransferSize;

private InetSocketAddress _localEndpoint;

public RemoteHttpDataTransferProtocol(CloseableHttpClient client) {
_client = requireNonNull(client);
}
Expand Down Expand Up @@ -474,6 +476,47 @@ private Optional<InetSocketAddress> remoteAddress() {
}
}

private Optional<InetSocketAddress> localAddress() {
HttpContext context = getContext();
if (context == null) {
LOGGER.debug("No HttpContext value");
return Optional.empty();
}

Object conn = context.getAttribute(HttpCoreContext.HTTP_CONNECTION);
if (conn == null) {
LOGGER.debug("HTTP_CONNECTION is null");
return Optional.empty();
}

if (!(conn instanceof HttpInetConnection)) {
throw new RuntimeException("HTTP_CONNECTION has unexpected type: "
+ conn.getClass().getCanonicalName());
}

HttpInetConnection inetConn = (HttpInetConnection) conn;
if (!inetConn.isOpen()) {
LOGGER.debug("HttpConnection is no longer open");
return Optional.empty();
}

try {
InetAddress addr = inetConn.getLocalAddress();
if (addr == null) {
LOGGER.debug("HttpInetConnection is not connected");
return Optional.empty();
}

int port = inetConn.getLocalPort();
InetSocketAddress sockAddr = new InetSocketAddress(addr, port);
return Optional.of(sockAddr);
} catch (ConnectionShutdownException e) {
LOGGER.warn("HTTP_CONNECTION has unexpectedly disconnected");
// Perhaps a race condition here? Hey ho.
return Optional.empty();
}
}

private HttpGet buildGetRequest(RemoteHttpDataTransferProtocolInfo info,
long deadline) {
HttpGet get = new HttpGet(info.getUri());
Expand All @@ -496,6 +539,8 @@ private CloseableHttpResponse doGet(final RemoteHttpDataTransferProtocolInfo inf
HttpGet get = buildGetRequest(info, deadline);
CloseableHttpResponse response = _client.execute(get, context);

_localEndpoint = localAddress().orElse(null);

boolean isSuccessful = false;
try {
while (shouldRetry(response) && System.currentTimeMillis() < deadline) {
Expand Down Expand Up @@ -559,6 +604,7 @@ private void sendFile(RemoteHttpDataTransferProtocolInfo info)
redirectionCount > 0 ? REDIRECTED_REQUEST : INITIAL_REQUEST);

try (CloseableHttpResponse response = _client.execute(put, context)) {
_localEndpoint = localAddress().orElse(null);
StatusLine status = response.getStatusLine();
switch (status.getStatusCode()) {
case 200: /* OK (not actually a valid response from PUT) */
Expand Down Expand Up @@ -934,4 +980,9 @@ public List<InetSocketAddress> remoteConnections() {
public Long getBytesExpected() {
return _expectedTransferSize;
}

@Override
public Optional<InetSocketAddress> getLocalEndpoint() {
return Optional.ofNullable(_localEndpoint);
}
}

0 comments on commit 1ae82cc

Please sign in to comment.