Skip to content

Commit

Permalink
More useful toString() on transport channels (#93648)
Browse files Browse the repository at this point in the history
Today `TaskTransportChannel` has a default `toString()` implementation,
but this appears in logs sometimes and it would be much more useful to
include details of the channel such as the action name, the local/remote
addresses, and the task/request IDs. This commit does so.
  • Loading branch information
DaveCTurner committed Feb 9, 2023
1 parent dad0354 commit 3fa19e5
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,20 @@ public void onFailure(Exception e) {
channel.sendResponse(e);
} catch (Exception sendException) {
sendException.addSuppressed(e);
logger.warn(() -> format("Failed to send error response for action [%s] and request [%s]", actionName, request), sendException);
logger.warn(
() -> format(
"Failed to send error response on channel [%s] for action [%s] and request [%s]",
channel,
actionName,
request
),
sendException
);
}
}

@Override
public String toString() {
return "ChannelActionListener{" + channel + "}{" + request + "}{" + actionName + "}";
return "ChannelActionListener{" + channel + "}{" + actionName + "}{" + request + "}";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public void processMessageReceived(Request request, TransportChannel channel) th
final Releasable stopTracking = taskManager.startTrackingCancellableChannelTask(tcpChannel, (CancellableTask) task);
unregisterTask = Releasables.wrap(unregisterTask, stopTracking);
}
final TaskTransportChannel taskTransportChannel = new TaskTransportChannel(channel, unregisterTask);
final TaskTransportChannel taskTransportChannel = new TaskTransportChannel(task.getId(), channel, unregisterTask);
handler.messageReceived(request, taskTransportChannel, task);
unregisterTask = null;
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,19 @@
package org.elasticsearch.transport;

import org.elasticsearch.TransportVersion;
import org.elasticsearch.common.Strings;
import org.elasticsearch.core.Releasable;

import java.io.IOException;

public class TaskTransportChannel implements TransportChannel {

private final long taskId;
private final TransportChannel channel;
private final Releasable onTaskFinished;

TaskTransportChannel(TransportChannel channel, Releasable onTaskFinished) {
TaskTransportChannel(long taskId, TransportChannel channel, Releasable onTaskFinished) {
this.taskId = taskId;
this.channel = channel;
this.onTaskFinished = onTaskFinished;
}
Expand Down Expand Up @@ -59,4 +62,9 @@ public TransportVersion getVersion() {
public TransportChannel getChannel() {
return channel;
}

@Override
public String toString() {
return Strings.format("TaskTransportChannel{task=%d}{%s}", taskId, channel);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.elasticsearch.transport;

import org.elasticsearch.TransportVersion;
import org.elasticsearch.common.Strings;
import org.elasticsearch.core.Releasable;

import java.io.IOException;
Expand Down Expand Up @@ -95,4 +96,9 @@ public TransportVersion getVersion() {
public TcpChannel getChannel() {
return channel;
}

@Override
public String toString() {
return Strings.format("TcpTransportChannel{req=%d}{%s}{%s}", requestId, action, channel);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1525,6 +1525,11 @@ public String getChannelType() {
public TransportVersion getVersion() {
return localNode.getVersion().transportVersion;
}

@Override
public String toString() {
return Strings.format("DirectResponseChannel{req=%d}{%s}", requestId, action);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.StepListener;
import org.elasticsearch.action.support.ChannelActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
Expand Down Expand Up @@ -3038,6 +3039,59 @@ public void testFailToSendIllegalStateException() throws InterruptedException {
assertThat(exception.getCause().getMessage(), equalTo("fail to send"));
}

public void testChannelToString() {
final String ACTION = "internal:action";
serviceA.registerRequestHandler(ACTION, ThreadPool.Names.SAME, TransportRequest.Empty::new, (request, channel, task) -> {
assertThat(
channel.toString(),
allOf(
containsString("DirectResponseChannel"),
containsString('{' + ACTION + '}'),
containsString("TaskTransportChannel{task=" + task.getId() + '}')
)
);
assertThat(new ChannelActionListener<>(channel, ACTION, request).toString(), containsString(channel.toString()));
channel.sendResponse(TransportResponse.Empty.INSTANCE);
});
serviceB.registerRequestHandler(ACTION, ThreadPool.Names.SAME, TransportRequest.Empty::new, (request, channel, task) -> {
assertThat(
channel.toString(),
allOf(
containsString("TcpTransportChannel"),
containsString('{' + ACTION + '}'),
containsString("TaskTransportChannel{task=" + task.getId() + '}'),
containsString("localAddress="),
containsString(serviceB.getLocalNode().getAddress().toString())
)
);
channel.sendResponse(TransportResponse.Empty.INSTANCE);
});

PlainActionFuture.get(
f -> submitRequest(
serviceA,
serviceA.getLocalNode(),
ACTION,
TransportRequest.Empty.INSTANCE,
new ActionListenerResponseHandler<>(f, ignored -> TransportResponse.Empty.INSTANCE)
),
10,
TimeUnit.SECONDS
);

PlainActionFuture.get(
f -> submitRequest(
serviceA,
serviceB.getLocalNode(),
ACTION,
TransportRequest.Empty.INSTANCE,
new ActionListenerResponseHandler<>(f, ignored -> TransportResponse.Empty.INSTANCE)
),
10,
TimeUnit.SECONDS
);
}

// test that the response handler is invoked on a failure to send
private TransportException doFailToSend(RuntimeException failToSendException) throws InterruptedException {
final TransportInterceptor interceptor = new TransportInterceptor() {
Expand Down

0 comments on commit 3fa19e5

Please sign in to comment.