Skip to content

Commit

Permalink
Remove transport name from tcp channel (#40074)
Browse files Browse the repository at this point in the history
Currently, we maintain a transport name ("mock-nio", "nio", "netty")
that is passed to a `TcpTransportChannel` when a request is received.
The value of this name is to associate with the task when we register a
task with the task manager. However, it is only possible to run ES with
one transport, so having an implementation specific name is unnecessary.
This commit removes the name and replaces it with the generic
"transport".
  • Loading branch information
Tim-Brooks committed Mar 15, 2019
1 parent 6ffa8a0 commit 0b50a67
Show file tree
Hide file tree
Showing 6 changed files with 12 additions and 16 deletions.
Expand Up @@ -108,7 +108,7 @@ public class Netty4Transport extends TcpTransport {
public Netty4Transport(Settings settings, Version version, ThreadPool threadPool, NetworkService networkService,
PageCacheRecycler pageCacheRecycler, NamedWriteableRegistry namedWriteableRegistry,
CircuitBreakerService circuitBreakerService) {
super("netty", settings, version, threadPool, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, networkService);
super(settings, version, threadPool, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, networkService);
Netty4Utils.setAvailableProcessors(EsExecutors.PROCESSORS_SETTING.get(settings));
this.workerCount = WORKER_COUNT.get(settings);

Expand Down
Expand Up @@ -64,7 +64,7 @@ public class NioTransport extends TcpTransport {
protected NioTransport(Settings settings, Version version, ThreadPool threadPool, NetworkService networkService,
PageCacheRecycler pageCacheRecycler, NamedWriteableRegistry namedWriteableRegistry,
CircuitBreakerService circuitBreakerService, NioGroupFactory groupFactory) {
super("nio", settings, version, threadPool, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, networkService);
super(settings, version, threadPool, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, networkService);
this.groupFactory = groupFactory;
}

Expand Down
Expand Up @@ -129,7 +129,6 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
// connections while no connect operations is going on
private final ReadWriteLock closeLock = new ReentrantReadWriteLock();
private volatile BoundTransportAddress boundAddress;
private final String transportName;

private final MeanMetric readBytesMetric = new MeanMetric();
private volatile Map<String, RequestHandlerRegistry<? extends TransportRequest>> requestHandlers = Collections.emptyMap();
Expand All @@ -141,9 +140,9 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
private final OutboundHandler outboundHandler;
private final String nodeName;

public TcpTransport(String transportName, Settings settings, Version version, ThreadPool threadPool,
PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService,
NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService) {
public TcpTransport(Settings settings, Version version, ThreadPool threadPool, PageCacheRecycler pageCacheRecycler,
CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry,
NetworkService networkService) {
this.settings = settings;
this.profileSettings = getProfileSettings(settings);
this.version = version;
Expand All @@ -152,7 +151,6 @@ public TcpTransport(String transportName, Settings settings, Version version, Th
this.pageCacheRecycler = pageCacheRecycler;
this.circuitBreakerService = circuitBreakerService;
this.networkService = networkService;
this.transportName = transportName;
this.transportLogger = new TransportLogger();
this.outboundHandler = new OutboundHandler(threadPool, bigArrays, transportLogger);
this.handshaker = new TransportHandshaker(version, threadPool,
Expand Down Expand Up @@ -1023,7 +1021,7 @@ protected void handleRequest(TcpChannel channel, InboundMessage.RequestMessage m
} else {
getInFlightRequestBreaker().addWithoutBreaking(messageLengthBytes);
}
transportChannel = new TcpTransportChannel(this, channel, transportName, action, requestId, version, features, profileName,
transportChannel = new TcpTransportChannel(this, channel, action, requestId, version, features, profileName,
messageLengthBytes, message.isCompress());
final TransportRequest request = reg.newRequest(stream);
request.remoteAddress(new TransportAddress(channel.getRemoteAddress()));
Expand All @@ -1034,7 +1032,7 @@ protected void handleRequest(TcpChannel channel, InboundMessage.RequestMessage m
} catch (Exception e) {
// the circuit breaker tripped
if (transportChannel == null) {
transportChannel = new TcpTransportChannel(this, channel, transportName, action, requestId, version, features,
transportChannel = new TcpTransportChannel(this, channel, action, requestId, version, features,
profileName, 0, message.isCompress());
}
try {
Expand Down
Expand Up @@ -35,12 +35,11 @@ public final class TcpTransportChannel implements TransportChannel {
private final long requestId;
private final String profileName;
private final long reservedBytes;
private final String channelType;
private final TcpChannel channel;
private final boolean compressResponse;

TcpTransportChannel(TcpTransport transport, TcpChannel channel, String channelType, String action, long requestId, Version version,
Set<String> features, String profileName, long reservedBytes, boolean compressResponse) {
TcpTransportChannel(TcpTransport transport, TcpChannel channel, String action, long requestId, Version version, Set<String> features,
String profileName, long reservedBytes, boolean compressResponse) {
this.version = version;
this.features = features;
this.channel = channel;
Expand All @@ -49,7 +48,6 @@ public final class TcpTransportChannel implements TransportChannel {
this.requestId = requestId;
this.profileName = profileName;
this.reservedBytes = reservedBytes;
this.channelType = channelType;
this.compressResponse = compressResponse;
}

Expand Down Expand Up @@ -91,7 +89,7 @@ private void release(boolean isExceptionResponse) {

@Override
public String getChannelType() {
return channelType;
return "transport";
}

@Override
Expand Down
Expand Up @@ -164,7 +164,7 @@ public void testCompressRequestAndResponse() throws IOException {
ThreadPool threadPool = new TestThreadPool(TcpTransportTests.class.getName());
AtomicReference<BytesReference> messageCaptor = new AtomicReference<>();
try {
TcpTransport transport = new TcpTransport("test", Settings.EMPTY, Version.CURRENT, threadPool,
TcpTransport transport = new TcpTransport(Settings.EMPTY, Version.CURRENT, threadPool,
PageCacheRecycler.NON_RECYCLING_INSTANCE, new NoneCircuitBreakerService(), null, null) {

@Override
Expand Down
Expand Up @@ -75,7 +75,7 @@ public class MockNioTransport extends TcpTransport {
public MockNioTransport(Settings settings, Version version, ThreadPool threadPool, NetworkService networkService,
PageCacheRecycler pageCacheRecycler, NamedWriteableRegistry namedWriteableRegistry,
CircuitBreakerService circuitBreakerService) {
super("mock-nio", settings, version, threadPool, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, networkService);
super(settings, version, threadPool, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, networkService);
}

@Override
Expand Down

0 comments on commit 0b50a67

Please sign in to comment.