From 9ab1325953b30fb3d881e9c27b2afdd3274322b6 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Wed, 20 Jun 2018 16:34:56 -0600 Subject: [PATCH] Introduce http and tcp server channels (#31446) Historically in TcpTransport server channels were represented by the same channel interface as socket channels. This was necessary as TcpTransport was parameterized by the channel type. This commit introduces TcpServerChannel and HttpServerChannel classes. Additionally, it adds the implementations for the various transports. This allows server channels to have unique functionality and not implement the methods they do not support (such as send and getRemoteAddress). Additionally, with the introduction of HttpServerChannel this commit extracts some of the storing and closing channel work to the abstract http server transport. --- .../http/netty4/Netty4HttpRequestHandler.java | 11 +- .../http/netty4/Netty4HttpServerChannel.java | 76 ++++++++++ .../netty4/Netty4HttpServerTransport.java | 116 +++++---------- .../netty4/Netty4MessageChannelHandler.java | 18 ++- ...yTcpChannel.java => Netty4TcpChannel.java} | 6 +- .../netty4/Netty4TcpServerChannel.java | 84 +++++++++++ .../transport/netty4/Netty4Transport.java | 45 +++--- .../Netty4SizeHeaderFrameDecoderTests.java | 2 +- .../http/nio/NioHttpServerChannel.java | 44 ++++++ .../http/nio/NioHttpServerTransport.java | 134 +++--------------- .../transport/nio/NioTcpServerChannel.java | 23 +-- .../transport/nio/NioTransport.java | 4 +- .../http/AbstractHttpServerTransport.java | 92 ++++++++++-- .../elasticsearch/http/HttpServerChannel.java | 34 +++++ .../transport/TcpServerChannel.java | 46 ++++++ .../elasticsearch/transport/TcpTransport.java | 27 ++-- .../AbstractHttpServerTransportTests.java | 13 +- .../transport/TcpTransportTests.java | 6 +- .../transport/MockTcpTransport.java | 2 +- .../transport/nio/MockNioTransport.java | 18 +-- .../netty4/SecurityNetty4Transport.java | 2 +- .../transport/ServerTransportFilter.java | 6 +- .../transport/nio/SecurityNioTransport.java | 6 +- 23 files changed, 501 insertions(+), 314 deletions(-) create mode 100644 modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerChannel.java rename modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/{NettyTcpChannel.java => Netty4TcpChannel.java} (96%) create mode 100644 modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4TcpServerChannel.java create mode 100644 plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpServerChannel.java create mode 100644 server/src/main/java/org/elasticsearch/http/HttpServerChannel.java create mode 100644 server/src/main/java/org/elasticsearch/transport/TcpServerChannel.java diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestHandler.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestHandler.java index 124bd607ab7ae..ab078ad10d337 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestHandler.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestHandler.java @@ -29,8 +29,6 @@ import org.elasticsearch.http.HttpPipelinedRequest; import org.elasticsearch.transport.netty4.Netty4Utils; -import static org.elasticsearch.http.netty4.Netty4HttpServerTransport.HTTP_CHANNEL_KEY; - @ChannelHandler.Sharable class Netty4HttpRequestHandler extends SimpleChannelInboundHandler> { @@ -42,7 +40,7 @@ class Netty4HttpRequestHandler extends SimpleChannelInboundHandler msg) throws Exception { - Netty4HttpChannel channel = ctx.channel().attr(HTTP_CHANNEL_KEY).get(); + Netty4HttpChannel channel = ctx.channel().attr(Netty4HttpServerTransport.HTTP_CHANNEL_KEY).get(); FullHttpRequest request = msg.getRequest(); try { @@ -77,12 +75,11 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpPipelinedRequest closeContext = new CompletableContext<>(); + + Netty4HttpServerChannel(Channel channel) { + this.channel = channel; + this.channel.closeFuture().addListener(f -> { + if (f.isSuccess()) { + closeContext.complete(null); + } else { + Throwable cause = f.cause(); + if (cause instanceof Error) { + Netty4Utils.maybeDie(cause); + closeContext.completeExceptionally(new Exception(cause)); + } else { + closeContext.completeExceptionally((Exception) cause); + } + } + }); + } + + @Override + public InetSocketAddress getLocalAddress() { + return (InetSocketAddress) channel.localAddress(); + } + + @Override + public void addCloseListener(ActionListener listener) { + closeContext.addListener(ActionListener.toBiConsumer(listener)); + } + + @Override + public boolean isOpen() { + return channel.isOpen(); + } + + @Override + public void close() { + channel.close(); + } + + @Override + public String toString() { + return "Netty4HttpChannel{localAddress=" + getLocalAddress() + "}"; + } +} diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java index 8a49ce38b89bc..34f00c0684040 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java @@ -23,6 +23,7 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; @@ -42,22 +43,19 @@ import io.netty.util.AttributeKey; import org.elasticsearch.common.Strings; import org.elasticsearch.common.network.CloseableChannel; -import org.elasticsearch.common.network.NetworkAddress; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.http.AbstractHttpServerTransport; -import org.elasticsearch.http.BindHttpException; import org.elasticsearch.http.HttpChannel; import org.elasticsearch.http.HttpHandlingSettings; -import org.elasticsearch.http.HttpStats; +import org.elasticsearch.http.HttpServerChannel; import org.elasticsearch.http.netty4.cors.Netty4CorsConfig; import org.elasticsearch.http.netty4.cors.Netty4CorsConfigBuilder; import org.elasticsearch.http.netty4.cors.Netty4CorsHandler; @@ -65,14 +63,9 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.netty4.Netty4Utils; -import java.io.IOException; -import java.net.InetAddress; import java.net.InetSocketAddress; -import java.util.ArrayList; import java.util.Arrays; -import java.util.List; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Pattern; import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory; @@ -154,12 +147,6 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport { private final int pipeliningMaxEvents; - private final boolean tcpNoDelay; - private final boolean tcpKeepAlive; - private final boolean reuseAddress; - - private final ByteSizeValue tcpSendBufferSize; - private final ByteSizeValue tcpReceiveBufferSize; private final RecvByteBufAllocator recvByteBufAllocator; private final int readTimeoutMillis; @@ -167,8 +154,6 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport { protected volatile ServerBootstrap serverBootstrap; - protected final List serverChannels = new ArrayList<>(); - private final Netty4CorsConfig corsConfig; public Netty4HttpServerTransport(Settings settings, NetworkService networkService, BigArrays bigArrays, ThreadPool threadPool, @@ -184,11 +169,6 @@ public Netty4HttpServerTransport(Settings settings, NetworkService networkServic this.maxCompositeBufferComponents = SETTING_HTTP_NETTY_MAX_COMPOSITE_BUFFER_COMPONENTS.get(settings); this.workerCount = SETTING_HTTP_WORKER_COUNT.get(settings); - this.tcpNoDelay = SETTING_HTTP_TCP_NO_DELAY.get(settings); - this.tcpKeepAlive = SETTING_HTTP_TCP_KEEP_ALIVE.get(settings); - this.reuseAddress = SETTING_HTTP_TCP_REUSE_ADDRESS.get(settings); - this.tcpSendBufferSize = SETTING_HTTP_TCP_SEND_BUFFER_SIZE.get(settings); - this.tcpReceiveBufferSize = SETTING_HTTP_TCP_RECEIVE_BUFFER_SIZE.get(settings); this.readTimeoutMillis = Math.toIntExact(SETTING_HTTP_READ_TIMEOUT.get(settings).getMillis()); ByteSizeValue receivePredictor = SETTING_HTTP_NETTY_RECEIVE_PREDICTOR_SIZE.get(settings); @@ -217,6 +197,7 @@ protected void doStart() { serverBootstrap.channel(NioServerSocketChannel.class); serverBootstrap.childHandler(configureServerChannelHandler()); + serverBootstrap.handler(new ServerChannelExceptionHandler(this)); serverBootstrap.childOption(ChannelOption.TCP_NODELAY, SETTING_HTTP_TCP_NO_DELAY.get(settings)); serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, SETTING_HTTP_TCP_KEEP_ALIVE.get(settings)); @@ -238,10 +219,7 @@ protected void doStart() { serverBootstrap.option(ChannelOption.SO_REUSEADDR, reuseAddress); serverBootstrap.childOption(ChannelOption.SO_REUSEADDR, reuseAddress); - this.boundAddress = createBoundHttpAddress(); - if (logger.isInfoEnabled()) { - logger.info("{}", boundAddress); - } + bindServer(); success = true; } finally { if (success == false) { @@ -284,78 +262,29 @@ static Netty4CorsConfig buildCorsConfig(Settings settings) { } @Override - protected TransportAddress bindAddress(final InetAddress hostAddress) { - final AtomicReference lastException = new AtomicReference<>(); - final AtomicReference boundSocket = new AtomicReference<>(); - boolean success = port.iterate(portNumber -> { - try { - synchronized (serverChannels) { - ChannelFuture future = serverBootstrap.bind(new InetSocketAddress(hostAddress, portNumber)).sync(); - serverChannels.add(future.channel()); - boundSocket.set((InetSocketAddress) future.channel().localAddress()); - } - } catch (Exception e) { - lastException.set(e); - return false; - } - return true; - }); - if (!success) { - throw new BindHttpException("Failed to bind to [" + port.getPortRangeString() + "]", lastException.get()); - } - - if (logger.isDebugEnabled()) { - logger.debug("Bound http to address {{}}", NetworkAddress.format(boundSocket.get())); - } - return new TransportAddress(boundSocket.get()); + protected HttpServerChannel bind(InetSocketAddress socketAddress) throws Exception { + ChannelFuture future = serverBootstrap.bind(socketAddress).sync(); + Channel channel = future.channel(); + Netty4HttpServerChannel httpServerChannel = new Netty4HttpServerChannel(channel); + channel.attr(HTTP_SERVER_CHANNEL_KEY).set(httpServerChannel); + return httpServerChannel; } @Override - protected void doStop() { - synchronized (serverChannels) { - if (!serverChannels.isEmpty()) { - try { - Netty4Utils.closeChannels(serverChannels); - } catch (IOException e) { - logger.trace("exception while closing channels", e); - } finally { - serverChannels.clear(); - } - } - } - - // TODO: Move all of channel closing to abstract class once server channels are handled - try { - CloseableChannel.closeChannels(new ArrayList<>(httpChannels), true); - } catch (Exception e) { - logger.warn("unexpected exception while closing http channels", e); - } - httpChannels.clear(); - - - + protected void stopInternal() { if (serverBootstrap != null) { serverBootstrap.config().group().shutdownGracefully(0, 5, TimeUnit.SECONDS).awaitUninterruptibly(); serverBootstrap = null; } } - @Override - protected void doClose() { - } - - @Override - public HttpStats stats() { - return new HttpStats(httpChannels.size(), totalChannelsAccepted.get()); - } - @Override protected void onException(HttpChannel channel, Exception cause) { if (cause instanceof ReadTimeoutException) { if (logger.isTraceEnabled()) { logger.trace("Http read timeout {}", channel); } - CloseableChannel.closeChannel(channel);; + CloseableChannel.closeChannel(channel); } else { super.onException(channel, cause); } @@ -366,6 +295,7 @@ public ChannelHandler configureServerChannelHandler() { } static final AttributeKey HTTP_CHANNEL_KEY = AttributeKey.newInstance("es-http-channel"); + static final AttributeKey HTTP_SERVER_CHANNEL_KEY = AttributeKey.newInstance("es-http-server-channel"); protected static class HttpChannelHandler extends ChannelInitializer { @@ -413,4 +343,24 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E } } + @ChannelHandler.Sharable + private static class ServerChannelExceptionHandler extends ChannelHandlerAdapter { + + private final Netty4HttpServerTransport transport; + + private ServerChannelExceptionHandler(Netty4HttpServerTransport transport) { + this.transport = transport; + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + Netty4Utils.maybeDie(cause); + Netty4HttpServerChannel httpServerChannel = ctx.channel().attr(HTTP_SERVER_CHANNEL_KEY).get(); + if (cause instanceof Error) { + transport.onServerException(httpServerChannel, new Exception(cause)); + } else { + transport.onServerException(httpServerChannel, (Exception) cause); + } + } + } } diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4MessageChannelHandler.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4MessageChannelHandler.java index 58440ae96e07a..698c86d048c1c 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4MessageChannelHandler.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4MessageChannelHandler.java @@ -24,6 +24,8 @@ import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.util.Attribute; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.transport.TcpHeader; import org.elasticsearch.transport.Transports; @@ -36,11 +38,9 @@ final class Netty4MessageChannelHandler extends ChannelDuplexHandler { private final Netty4Transport transport; - private final String profileName; - Netty4MessageChannelHandler(Netty4Transport transport, String profileName) { + Netty4MessageChannelHandler(Netty4Transport transport) { this.transport = transport; - this.profileName = profileName; } @Override @@ -58,7 +58,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception // netty always copies a buffer, either in NioWorker in its read handler, where it copies to a fresh // buffer, or in the cumulative buffer, which is cleaned each time so it could be bigger than the actual size BytesReference reference = Netty4Utils.toBytesReference(buffer, remainingMessageSize); - Attribute channelAttribute = channel.attr(Netty4Transport.CHANNEL_KEY); + Attribute channelAttribute = channel.attr(Netty4Transport.CHANNEL_KEY); transport.messageReceived(reference, channelAttribute.get()); } finally { // Set the expected position of the buffer, no matter what happened @@ -69,7 +69,13 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { Netty4Utils.maybeDie(cause); - transport.exceptionCaught(ctx, cause); + final Throwable unwrapped = ExceptionsHelper.unwrap(cause, ElasticsearchException.class); + final Throwable newCause = unwrapped != null ? unwrapped : cause; + Netty4TcpChannel tcpChannel = ctx.channel().attr(Netty4Transport.CHANNEL_KEY).get(); + if (newCause instanceof Error) { + transport.onException(tcpChannel, new Exception(newCause)); + } else { + transport.onException(tcpChannel, (Exception) newCause); + } } - } diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/NettyTcpChannel.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4TcpChannel.java similarity index 96% rename from modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/NettyTcpChannel.java rename to modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4TcpChannel.java index 89fabdcd763d1..78a1425500072 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/NettyTcpChannel.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4TcpChannel.java @@ -30,13 +30,13 @@ import java.net.InetSocketAddress; -public class NettyTcpChannel implements TcpChannel { +public class Netty4TcpChannel implements TcpChannel { private final Channel channel; private final String profile; private final CompletableContext closeContext = new CompletableContext<>(); - NettyTcpChannel(Channel channel, String profile) { + Netty4TcpChannel(Channel channel, String profile) { this.channel = channel; this.profile = profile; this.channel.closeFuture().addListener(f -> { @@ -118,7 +118,7 @@ public Channel getLowLevelChannel() { @Override public String toString() { - return "NettyTcpChannel{" + + return "Netty4TcpChannel{" + "localAddress=" + getLocalAddress() + ", remoteAddress=" + channel.remoteAddress() + '}'; diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4TcpServerChannel.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4TcpServerChannel.java new file mode 100644 index 0000000000000..873a6c33fba11 --- /dev/null +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4TcpServerChannel.java @@ -0,0 +1,84 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.transport.netty4; + +import io.netty.channel.Channel; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.common.concurrent.CompletableContext; +import org.elasticsearch.transport.TcpServerChannel; + +import java.net.InetSocketAddress; + +public class Netty4TcpServerChannel implements TcpServerChannel { + + private final Channel channel; + private final String profile; + private final CompletableContext closeContext = new CompletableContext<>(); + + Netty4TcpServerChannel(Channel channel, String profile) { + this.channel = channel; + this.profile = profile; + this.channel.closeFuture().addListener(f -> { + if (f.isSuccess()) { + closeContext.complete(null); + } else { + Throwable cause = f.cause(); + if (cause instanceof Error) { + Netty4Utils.maybeDie(cause); + closeContext.completeExceptionally(new Exception(cause)); + } else { + closeContext.completeExceptionally((Exception) cause); + } + } + }); + } + + @Override + public String getProfile() { + return profile; + } + + @Override + public InetSocketAddress getLocalAddress() { + return (InetSocketAddress) channel.localAddress(); + } + + @Override + public void close() { + channel.close(); + } + + @Override + public void addCloseListener(ActionListener listener) { + closeContext.addListener(ActionListener.toBiConsumer(listener)); + } + + @Override + public boolean isOpen() { + return channel.isOpen(); + } + + @Override + public String toString() { + return "Netty4TcpChannel{" + + "localAddress=" + getLocalAddress() + + '}'; + } +} diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java index 466c4b68bfa4e..c8c6fceb54304 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java @@ -25,6 +25,7 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; @@ -37,8 +38,6 @@ import io.netty.util.concurrent.Future; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; -import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.collect.Tuple; @@ -196,6 +195,7 @@ private void createServerBootstrap(ProfileSettings profileSettings) { serverBootstrap.channel(NioServerSocketChannel.class); serverBootstrap.childHandler(getServerChannelInitializer(name)); + serverBootstrap.handler(new ServerChannelExceptionHandler()); serverBootstrap.childOption(ChannelOption.TCP_NODELAY, profileSettings.tcpNoDelay); serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, profileSettings.tcpKeepAlive); @@ -226,17 +226,11 @@ protected ChannelHandler getClientChannelInitializer() { return new ClientChannelInitializer(); } - static final AttributeKey CHANNEL_KEY = AttributeKey.newInstance("es-channel"); - - protected final void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - final Throwable unwrapped = ExceptionsHelper.unwrap(cause, ElasticsearchException.class); - final Throwable t = unwrapped != null ? unwrapped : cause; - Channel channel = ctx.channel(); - onException(channel.attr(CHANNEL_KEY).get(), t instanceof Exception ? (Exception) t : new ElasticsearchException(t)); - } + static final AttributeKey CHANNEL_KEY = AttributeKey.newInstance("es-channel"); + static final AttributeKey SERVER_CHANNEL_KEY = AttributeKey.newInstance("es-server-channel"); @Override - protected NettyTcpChannel initiateChannel(InetSocketAddress address, ActionListener listener) throws IOException { + protected Netty4TcpChannel initiateChannel(InetSocketAddress address, ActionListener listener) throws IOException { ChannelFuture channelFuture = bootstrap.connect(address); Channel channel = channelFuture.channel(); if (channel == null) { @@ -245,7 +239,7 @@ protected NettyTcpChannel initiateChannel(InetSocketAddress address, ActionListe } addClosedExceptionLogger(channel); - NettyTcpChannel nettyChannel = new NettyTcpChannel(channel, "default"); + Netty4TcpChannel nettyChannel = new Netty4TcpChannel(channel, "default"); channel.attr(CHANNEL_KEY).set(nettyChannel); channelFuture.addListener(f -> { @@ -266,10 +260,10 @@ protected NettyTcpChannel initiateChannel(InetSocketAddress address, ActionListe } @Override - protected NettyTcpChannel bind(String name, InetSocketAddress address) { + protected Netty4TcpServerChannel bind(String name, InetSocketAddress address) { Channel channel = serverBootstraps.get(name).bind(address).syncUninterruptibly().channel(); - NettyTcpChannel esChannel = new NettyTcpChannel(channel, name); - channel.attr(CHANNEL_KEY).set(esChannel); + Netty4TcpServerChannel esChannel = new Netty4TcpServerChannel(channel, name); + channel.attr(SERVER_CHANNEL_KEY).set(esChannel); return esChannel; } @@ -310,7 +304,7 @@ protected void initChannel(Channel ch) throws Exception { ch.pipeline().addLast("logging", new ESLoggingHandler()); ch.pipeline().addLast("size", new Netty4SizeHeaderFrameDecoder()); // using a dot as a prefix means this cannot come from any settings parsed - ch.pipeline().addLast("dispatcher", new Netty4MessageChannelHandler(Netty4Transport.this, ".client")); + ch.pipeline().addLast("dispatcher", new Netty4MessageChannelHandler(Netty4Transport.this)); } @Override @@ -331,11 +325,11 @@ protected ServerChannelInitializer(String name) { @Override protected void initChannel(Channel ch) throws Exception { addClosedExceptionLogger(ch); - NettyTcpChannel nettyTcpChannel = new NettyTcpChannel(ch, name); + Netty4TcpChannel nettyTcpChannel = new Netty4TcpChannel(ch, name); ch.attr(CHANNEL_KEY).set(nettyTcpChannel); ch.pipeline().addLast("logging", new ESLoggingHandler()); ch.pipeline().addLast("size", new Netty4SizeHeaderFrameDecoder()); - ch.pipeline().addLast("dispatcher", new Netty4MessageChannelHandler(Netty4Transport.this, name)); + ch.pipeline().addLast("dispatcher", new Netty4MessageChannelHandler(Netty4Transport.this)); serverAcceptedChannel(nettyTcpChannel); } @@ -353,4 +347,19 @@ private void addClosedExceptionLogger(Channel channel) { } }); } + + @ChannelHandler.Sharable + private class ServerChannelExceptionHandler extends ChannelHandlerAdapter { + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + Netty4Utils.maybeDie(cause); + Netty4TcpServerChannel serverChannel = ctx.channel().attr(SERVER_CHANNEL_KEY).get(); + if (cause instanceof Error) { + onServerException(serverChannel, new Exception(cause)); + } else { + onServerException(serverChannel, (Exception) cause); + } + } + } } diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4SizeHeaderFrameDecoderTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4SizeHeaderFrameDecoderTests.java index 7343da6c3b11a..4c783cf078769 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4SizeHeaderFrameDecoderTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4SizeHeaderFrameDecoderTests.java @@ -70,7 +70,7 @@ public void startThreadPool() { nettyTransport.start(); TransportAddress[] boundAddresses = nettyTransport.boundAddress().boundAddresses(); - TransportAddress transportAddress = (TransportAddress) randomFrom(boundAddresses); + TransportAddress transportAddress = randomFrom(boundAddresses); port = transportAddress.address().getPort(); host = transportAddress.address().getAddress(); } diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpServerChannel.java b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpServerChannel.java new file mode 100644 index 0000000000000..2674d38dc490e --- /dev/null +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpServerChannel.java @@ -0,0 +1,44 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.http.nio; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.http.HttpServerChannel; +import org.elasticsearch.nio.NioServerSocketChannel; + +import java.io.IOException; +import java.nio.channels.ServerSocketChannel; + +public class NioHttpServerChannel extends NioServerSocketChannel implements HttpServerChannel { + + NioHttpServerChannel(ServerSocketChannel serverSocketChannel) throws IOException { + super(serverSocketChannel); + } + + @Override + public void addCloseListener(ActionListener listener) { + addCloseListener(ActionListener.toBiConsumer(listener)); + } + + @Override + public String toString() { + return "NioHttpServerChannel{localAddress=" + getLocalAddress() + "}"; + } +} diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpServerTransport.java b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpServerTransport.java index aa0859e6146f2..b80778e964293 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpServerTransport.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpServerTransport.java @@ -21,40 +21,29 @@ import io.netty.handler.codec.http.HttpMethod; import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.ExceptionsHelper; -import org.elasticsearch.action.ActionFuture; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.common.Strings; -import org.elasticsearch.common.network.CloseableChannel; -import org.elasticsearch.common.network.NetworkAddress; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.recycler.Recycler; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.http.AbstractHttpServerTransport; -import org.elasticsearch.http.BindHttpException; import org.elasticsearch.http.HttpChannel; +import org.elasticsearch.http.HttpServerChannel; import org.elasticsearch.http.HttpServerTransport; -import org.elasticsearch.http.HttpStats; import org.elasticsearch.http.nio.cors.NioCorsConfig; import org.elasticsearch.http.nio.cors.NioCorsConfigBuilder; import org.elasticsearch.nio.BytesChannelContext; import org.elasticsearch.nio.ChannelFactory; import org.elasticsearch.nio.EventHandler; import org.elasticsearch.nio.InboundChannelBuffer; -import org.elasticsearch.nio.NioChannel; import org.elasticsearch.nio.NioGroup; import org.elasticsearch.nio.NioSelector; -import org.elasticsearch.nio.NioServerSocketChannel; import org.elasticsearch.nio.NioSocketChannel; import org.elasticsearch.nio.ServerChannelContext; import org.elasticsearch.nio.SocketChannelContext; @@ -62,18 +51,11 @@ import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; -import java.net.InetAddress; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; -import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.regex.Pattern; @@ -113,7 +95,6 @@ public class NioHttpServerTransport extends AbstractHttpServerTransport { private final int tcpSendBufferSize; private final int tcpReceiveBufferSize; - private final Set serverChannels = Collections.newSetFromMap(new ConcurrentHashMap<>()); private NioGroup nioGroup; private HttpChannelFactory channelFactory; private final NioCorsConfig corsConfig; @@ -156,12 +137,7 @@ protected void doStart() { daemonThreadFactory(this.settings, HTTP_SERVER_WORKER_THREAD_NAME_PREFIX), workerCount, (s) -> new EventHandler(this::onNonChannelException, s)); channelFactory = new HttpChannelFactory(); - this.boundAddress = createBoundHttpAddress(); - - if (logger.isInfoEnabled()) { - logger.info("{}", boundAddress); - } - + bindServer(); success = true; } catch (IOException e) { throw new ElasticsearchException(e); @@ -173,26 +149,7 @@ protected void doStart() { } @Override - protected void doStop() { - synchronized (serverChannels) { - if (serverChannels.isEmpty() == false) { - try { - closeChannels(new ArrayList<>(serverChannels)); - } catch (Exception e) { - logger.error("unexpected exception while closing http server channels", e); - } - serverChannels.clear(); - } - } - - // TODO: Move all of channel closing to abstract class once server channels are handled - try { - CloseableChannel.closeChannels(new ArrayList<>(httpChannels), true); - } catch (Exception e) { - logger.warn("unexpected exception while closing http channels", e); - } - httpChannels.clear(); - + protected void stopInternal() { try { nioGroup.close(); } catch (Exception e) { @@ -201,40 +158,8 @@ protected void doStop() { } @Override - protected void doClose() throws IOException { - } - - @Override - protected TransportAddress bindAddress(InetAddress hostAddress) { - final AtomicReference lastException = new AtomicReference<>(); - final AtomicReference boundSocket = new AtomicReference<>(); - boolean success = port.iterate(portNumber -> { - try { - synchronized (serverChannels) { - InetSocketAddress address = new InetSocketAddress(hostAddress, portNumber); - NioServerSocketChannel channel = nioGroup.bindServerChannel(address, channelFactory); - serverChannels.add(channel); - boundSocket.set(channel.getLocalAddress()); - } - } catch (Exception e) { - lastException.set(e); - return false; - } - return true; - }); - if (success == false) { - throw new BindHttpException("Failed to bind to [" + port.getPortRangeString() + "]", lastException.get()); - } - - if (logger.isDebugEnabled()) { - logger.debug("Bound http to address {{}}", NetworkAddress.format(boundSocket.get())); - } - return new TransportAddress(boundSocket.get()); - } - - @Override - public HttpStats stats() { - return new HttpStats(serverChannels.size(), totalChannelsAccepted.get()); + protected HttpServerChannel bind(InetSocketAddress socketAddress) throws IOException { + return nioGroup.bindServerChannel(socketAddress, channelFactory); } static NioCorsConfig buildCorsConfig(Settings settings) { @@ -269,33 +194,11 @@ static NioCorsConfig buildCorsConfig(Settings settings) { .build(); } - private void closeChannels(List channels) { - List> futures = new ArrayList<>(channels.size()); - - for (NioChannel channel : channels) { - PlainActionFuture future = PlainActionFuture.newFuture(); - channel.addCloseListener(ActionListener.toBiConsumer(future)); - futures.add(future); - channel.close(); - } - - List closeExceptions = new ArrayList<>(); - for (ActionFuture f : futures) { - try { - f.actionGet(); - } catch (RuntimeException e) { - closeExceptions.add(e); - } - } - - ExceptionsHelper.rethrowAndSuppress(closeExceptions); - } - private void acceptChannel(NioSocketChannel socketChannel) { super.serverAcceptedChannel((HttpChannel) socketChannel); } - private class HttpChannelFactory extends ChannelFactory { + private class HttpChannelFactory extends ChannelFactory { private HttpChannelFactory() { super(new RawChannelFactory(tcpNoDelay, tcpKeepAlive, reuseAddress, tcpSendBufferSize, tcpReceiveBufferSize)); @@ -303,29 +206,28 @@ private HttpChannelFactory() { @Override public NioHttpChannel createChannel(NioSelector selector, SocketChannel channel) throws IOException { - NioHttpChannel nioChannel = new NioHttpChannel(channel); + NioHttpChannel httpChannel = new NioHttpChannel(channel); java.util.function.Supplier pageSupplier = () -> { Recycler.V bytes = pageCacheRecycler.bytePage(false); return new InboundChannelBuffer.Page(ByteBuffer.wrap(bytes.v()), bytes::close); }; - HttpReadWriteHandler httpReadWritePipeline = new HttpReadWriteHandler(nioChannel,NioHttpServerTransport.this, + HttpReadWriteHandler httpReadWritePipeline = new HttpReadWriteHandler(httpChannel,NioHttpServerTransport.this, handlingSettings, corsConfig); - Consumer exceptionHandler = (e) -> onException(nioChannel, e); - SocketChannelContext context = new BytesChannelContext(nioChannel, selector, exceptionHandler, httpReadWritePipeline, + Consumer exceptionHandler = (e) -> onException(httpChannel, e); + SocketChannelContext context = new BytesChannelContext(httpChannel, selector, exceptionHandler, httpReadWritePipeline, new InboundChannelBuffer(pageSupplier)); - nioChannel.setContext(context); - return nioChannel; + httpChannel.setContext(context); + return httpChannel; } @Override - public NioServerSocketChannel createServerChannel(NioSelector selector, ServerSocketChannel channel) throws IOException { - NioServerSocketChannel nioChannel = new NioServerSocketChannel(channel); - Consumer exceptionHandler = (e) -> logger.error(() -> - new ParameterizedMessage("exception from server channel caught on transport layer [{}]", channel), e); + public NioHttpServerChannel createServerChannel(NioSelector selector, ServerSocketChannel channel) throws IOException { + NioHttpServerChannel httpServerChannel = new NioHttpServerChannel(channel); + Consumer exceptionHandler = (e) -> onServerException(httpServerChannel, e); Consumer acceptor = NioHttpServerTransport.this::acceptChannel; - ServerChannelContext context = new ServerChannelContext(nioChannel, this, selector, acceptor, exceptionHandler); - nioChannel.setContext(context); - return nioChannel; + ServerChannelContext context = new ServerChannelContext(httpServerChannel, this, selector, acceptor, exceptionHandler); + httpServerChannel.setContext(context); + return httpServerChannel; } } diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTcpServerChannel.java b/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTcpServerChannel.java index 10bf4ed752321..3c6d4b12df943 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTcpServerChannel.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTcpServerChannel.java @@ -20,19 +20,17 @@ package org.elasticsearch.transport.nio; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.nio.NioServerSocketChannel; -import org.elasticsearch.transport.TcpChannel; +import org.elasticsearch.transport.TcpServerChannel; import java.io.IOException; -import java.net.InetSocketAddress; import java.nio.channels.ServerSocketChannel; /** - * This is an implementation of {@link NioServerSocketChannel} that adheres to the {@link TcpChannel} + * This is an implementation of {@link NioServerSocketChannel} that adheres to the {@link TcpServerChannel} * interface. As it is a server socket, setting SO_LINGER and sending messages is not supported. */ -public class NioTcpServerChannel extends NioServerSocketChannel implements TcpChannel { +public class NioTcpServerChannel extends NioServerSocketChannel implements TcpServerChannel { private final String profile; @@ -41,21 +39,6 @@ public NioTcpServerChannel(String profile, ServerSocketChannel socketChannel) th this.profile = profile; } - @Override - public void sendMessage(BytesReference reference, ActionListener listener) { - throw new UnsupportedOperationException("Cannot send a message to a server channel."); - } - - @Override - public void setSoLinger(int value) throws IOException { - throw new UnsupportedOperationException("Cannot set SO_LINGER on a server channel."); - } - - @Override - public InetSocketAddress getRemoteAddress() { - return null; - } - @Override public void close() { getContext().closeChannel(); diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransport.java b/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransport.java index cf7d37493cb38..47229a0df2f6e 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransport.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransport.java @@ -19,7 +19,6 @@ package org.elasticsearch.transport.nio; -import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; @@ -176,8 +175,7 @@ public NioTcpChannel createChannel(NioSelector selector, SocketChannel channel) @Override public NioTcpServerChannel createServerChannel(NioSelector selector, ServerSocketChannel channel) throws IOException { NioTcpServerChannel nioChannel = new NioTcpServerChannel(profileName, channel); - Consumer exceptionHandler = (e) -> logger.error(() -> - new ParameterizedMessage("exception from server channel caught on transport layer [{}]", channel), e); + Consumer exceptionHandler = (e) -> onServerException(nioChannel, e); Consumer acceptor = NioTransport.this::acceptChannel; ServerChannelContext context = new ServerChannelContext(nioChannel, this, selector, acceptor, exceptionHandler); nioChannel.setContext(context); diff --git a/server/src/main/java/org/elasticsearch/http/AbstractHttpServerTransport.java b/server/src/main/java/org/elasticsearch/http/AbstractHttpServerTransport.java index 9d9008f7fb879..622020d6451db 100644 --- a/server/src/main/java/org/elasticsearch/http/AbstractHttpServerTransport.java +++ b/server/src/main/java/org/elasticsearch/http/AbstractHttpServerTransport.java @@ -27,6 +27,7 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.network.CloseableChannel; +import org.elasticsearch.common.network.NetworkAddress; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.BoundTransportAddress; @@ -53,6 +54,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_BIND_HOST; import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_CONTENT_LENGTH; @@ -74,9 +76,10 @@ public abstract class AbstractHttpServerTransport extends AbstractLifecycleCompo private final String[] bindHosts; private final String[] publishHosts; - protected final AtomicLong totalChannelsAccepted = new AtomicLong(); - protected final Set httpChannels = Collections.newSetFromMap(new ConcurrentHashMap<>()); - protected volatile BoundTransportAddress boundAddress; + private volatile BoundTransportAddress boundAddress; + private final AtomicLong totalChannelsAccepted = new AtomicLong(); + private final Set httpChannels = Collections.newSetFromMap(new ConcurrentHashMap<>()); + private final Set httpServerChannels = Collections.newSetFromMap(new ConcurrentHashMap<>()); protected AbstractHttpServerTransport(Settings settings, NetworkService networkService, BigArrays bigArrays, ThreadPool threadPool, NamedXContentRegistry xContentRegistry, Dispatcher dispatcher) { @@ -116,7 +119,12 @@ public HttpInfo info() { return new HttpInfo(boundTransportAddress, maxContentLength.getBytes()); } - protected BoundTransportAddress createBoundHttpAddress() { + @Override + public HttpStats stats() { + return new HttpStats(httpChannels.size(), totalChannelsAccepted.get()); + } + + protected void bindServer() { // Bind and start to accept incoming connections. InetAddress hostAddresses[]; try { @@ -138,11 +146,71 @@ protected BoundTransportAddress createBoundHttpAddress() { } final int publishPort = resolvePublishPort(settings, boundAddresses, publishInetAddress); - final InetSocketAddress publishAddress = new InetSocketAddress(publishInetAddress, publishPort); - return new BoundTransportAddress(boundAddresses.toArray(new TransportAddress[0]), new TransportAddress(publishAddress)); + TransportAddress publishAddress = new TransportAddress(new InetSocketAddress(publishInetAddress, publishPort)); + this.boundAddress = new BoundTransportAddress(boundAddresses.toArray(new TransportAddress[0]), publishAddress); + logger.info("{}", boundAddress); + } + + private TransportAddress bindAddress(final InetAddress hostAddress) { + final AtomicReference lastException = new AtomicReference<>(); + final AtomicReference boundSocket = new AtomicReference<>(); + boolean success = port.iterate(portNumber -> { + try { + synchronized (httpServerChannels) { + HttpServerChannel httpServerChannel = bind(new InetSocketAddress(hostAddress, portNumber)); + httpServerChannels.add(httpServerChannel); + boundSocket.set(httpServerChannel.getLocalAddress()); + } + } catch (Exception e) { + lastException.set(e); + return false; + } + return true; + }); + if (!success) { + throw new BindHttpException("Failed to bind to [" + port.getPortRangeString() + "]", lastException.get()); + } + + if (logger.isDebugEnabled()) { + logger.debug("Bound http to address {{}}", NetworkAddress.format(boundSocket.get())); + } + return new TransportAddress(boundSocket.get()); + } + + protected abstract HttpServerChannel bind(InetSocketAddress hostAddress) throws Exception; + + @Override + protected void doStop() { + synchronized (httpServerChannels) { + if (httpServerChannels.isEmpty() == false) { + try { + CloseableChannel.closeChannels(new ArrayList<>(httpServerChannels), true); + } catch (Exception e) { + logger.warn("exception while closing channels", e); + } finally { + httpServerChannels.clear(); + } + } + } + + try { + CloseableChannel.closeChannels(new ArrayList<>(httpChannels), true); + } catch (Exception e) { + logger.warn("unexpected exception while closing http channels", e); + } + httpChannels.clear(); + + stopInternal(); } - protected abstract TransportAddress bindAddress(InetAddress hostAddress); + @Override + protected void doClose() { + } + + /** + * Called to tear down internal resources + */ + protected abstract void stopInternal(); // package private for tests static int resolvePublishPort(Settings settings, List boundAddresses, InetAddress publishInetAddress) { @@ -197,19 +265,23 @@ protected void onException(HttpChannel channel, Exception e) { CloseableChannel.closeChannel(channel); } else { logger.warn(() -> new ParameterizedMessage( - "caught exception while handling client http traffic, closing connection {}", channel), e); + "caught exception while handling client http traffic, closing connection {}", channel), e); CloseableChannel.closeChannel(channel); } } + protected void onServerException(HttpServerChannel channel, Exception e) { + logger.error(new ParameterizedMessage("exception from http server channel caught on transport layer [channel={}]", channel), e); + } + /** * Exception handler for exceptions that are not associated with a specific channel. * * @param exception the exception */ protected void onNonChannelException(Exception exception) { - logger.warn(new ParameterizedMessage("exception caught on transport layer [thread={}]", Thread.currentThread().getName()), - exception); + String threadName = Thread.currentThread().getName(); + logger.warn(new ParameterizedMessage("exception caught on transport layer [thread={}]", threadName), exception); } protected void serverAcceptedChannel(HttpChannel httpChannel) { diff --git a/server/src/main/java/org/elasticsearch/http/HttpServerChannel.java b/server/src/main/java/org/elasticsearch/http/HttpServerChannel.java new file mode 100644 index 0000000000000..e4222ae816806 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/http/HttpServerChannel.java @@ -0,0 +1,34 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.http; + +import org.elasticsearch.common.network.CloseableChannel; + +import java.net.InetSocketAddress; + +public interface HttpServerChannel extends CloseableChannel { + + /** + * Returns the local address for this channel. + * + * @return the local address of this channel. + */ + InetSocketAddress getLocalAddress(); +} diff --git a/server/src/main/java/org/elasticsearch/transport/TcpServerChannel.java b/server/src/main/java/org/elasticsearch/transport/TcpServerChannel.java new file mode 100644 index 0000000000000..408ec1af20b96 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/transport/TcpServerChannel.java @@ -0,0 +1,46 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.transport; + +import org.elasticsearch.common.network.CloseableChannel; + +import java.net.InetSocketAddress; + + +/** + * This is a tcp channel representing a server channel listening for new connections. It is the server + * channel abstraction used by the {@link TcpTransport} and {@link TransportService}. All tcp transport + * implementations must return server channels that adhere to the required method contracts. + */ +public interface TcpServerChannel extends CloseableChannel { + + /** + * This returns the profile for this channel. + */ + String getProfile(); + + /** + * Returns the local address for this channel. + * + * @return the local address of this channel. + */ + InetSocketAddress getLocalAddress(); + +} diff --git a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java index bd862c19e9c6d..c8f256c2db89a 100644 --- a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -21,9 +21,6 @@ import com.carrotsearch.hppc.IntHashSet; import com.carrotsearch.hppc.IntSet; import org.apache.logging.log4j.message.ParameterizedMessage; -import org.elasticsearch.common.Booleans; -import org.elasticsearch.common.network.CloseableChannel; -import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; import org.elasticsearch.action.ActionFuture; @@ -31,6 +28,7 @@ import org.elasticsearch.action.NotifyOnceListener; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.Booleans; import org.elasticsearch.common.CheckedBiConsumer; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; @@ -52,6 +50,7 @@ import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.metrics.CounterMetric; import org.elasticsearch.common.metrics.MeanMetric; +import org.elasticsearch.common.network.CloseableChannel; import org.elasticsearch.common.network.NetworkAddress; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.network.NetworkUtils; @@ -68,6 +67,7 @@ import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.KeyedLock; import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.monitor.jvm.JvmInfo; import org.elasticsearch.rest.RestStatus; @@ -210,7 +210,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements private final ConcurrentMap profileBoundAddresses = newConcurrentMap(); // node id to actual channel private final ConcurrentMap connectedNodes = newConcurrentMap(); - private final Map> serverChannels = newConcurrentMap(); + private final Map> serverChannels = newConcurrentMap(); private final Set acceptedChannels = Collections.newSetFromMap(new ConcurrentHashMap<>()); private final KeyedLock connectionLock = new KeyedLock<>(); @@ -792,9 +792,9 @@ protected InetSocketAddress bindToPort(final String name, final InetAddress host final AtomicReference boundSocket = new AtomicReference<>(); boolean success = portsRange.iterate(portNumber -> { try { - TcpChannel channel = bind(name, new InetSocketAddress(hostAddress, portNumber)); + TcpServerChannel channel = bind(name, new InetSocketAddress(hostAddress, portNumber)); synchronized (serverChannels) { - List list = serverChannels.get(name); + List list = serverChannels.get(name); if (list == null) { list = new ArrayList<>(); serverChannels.put(name, list); @@ -957,9 +957,9 @@ protected final void doStop() { closeLock.writeLock().lock(); try { // first stop to accept any incoming connections so nobody can connect to this transport - for (Map.Entry> entry : serverChannels.entrySet()) { + for (Map.Entry> entry : serverChannels.entrySet()) { String profile = entry.getKey(); - List channels = entry.getValue(); + List channels = entry.getValue(); ActionListener closeFailLogger = ActionListener.wrap(c -> {}, e -> logger.warn(() -> new ParameterizedMessage("Error closing serverChannel for profile [{}]", profile), e)); channels.forEach(c -> c.addCloseListener(closeFailLogger)); @@ -999,7 +999,7 @@ protected final void doStop() { } } - protected void onException(TcpChannel channel, Exception e) { + public void onException(TcpChannel channel, Exception e) { if (!lifecycle.started()) { // just close and ignore - we are already stopped and just need to make sure we release all resources CloseableChannel.closeChannel(channel); @@ -1049,6 +1049,10 @@ protected void innerOnFailure(Exception e) { } } + protected void onServerException(TcpServerChannel channel, Exception e) { + logger.error(new ParameterizedMessage("exception from server channel caught on transport layer [channel={}]", channel), e); + } + /** * Exception handler for exceptions that are not associated with a specific channel. * @@ -1072,7 +1076,7 @@ protected void serverAcceptedChannel(TcpChannel channel) { * @param name the profile name * @param address the address to bind to */ - protected abstract TcpChannel bind(String name, InetSocketAddress address) throws IOException; + protected abstract TcpServerChannel bind(String name, InetSocketAddress address) throws IOException; /** * Initiate a single tcp socket channel. @@ -1087,8 +1091,7 @@ protected void serverAcceptedChannel(TcpChannel channel) { /** * Called to tear down internal resources */ - protected void stopInternal() { - } + protected abstract void stopInternal(); public boolean canCompress(TransportRequest request) { return compress && (!(request instanceof BytesTransportRequest)); diff --git a/server/src/test/java/org/elasticsearch/http/AbstractHttpServerTransportTests.java b/server/src/test/java/org/elasticsearch/http/AbstractHttpServerTransportTests.java index a7629e5f48b6c..ece9fd503c1ce 100644 --- a/server/src/test/java/org/elasticsearch/http/AbstractHttpServerTransportTests.java +++ b/server/src/test/java/org/elasticsearch/http/AbstractHttpServerTransportTests.java @@ -35,8 +35,7 @@ import org.junit.After; import org.junit.Before; -import java.io.IOException; -import java.net.InetAddress; +import java.net.InetSocketAddress; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Collections; @@ -128,8 +127,9 @@ public void dispatchBadRequest(final RestRequest request, try (AbstractHttpServerTransport transport = new AbstractHttpServerTransport(Settings.EMPTY, networkService, bigArrays, threadPool, xContentRegistry(), dispatcher) { + @Override - protected TransportAddress bindAddress(InetAddress hostAddress) { + protected HttpServerChannel bind(InetSocketAddress hostAddress) { return null; } @@ -139,12 +139,7 @@ protected void doStart() { } @Override - protected void doStop() { - - } - - @Override - protected void doClose() throws IOException { + protected void stopInternal() { } diff --git a/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java b/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java index 2328aa4636361..d16300bf266d6 100644 --- a/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java +++ b/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java @@ -193,6 +193,10 @@ protected FakeChannel initiateChannel(InetSocketAddress address, ActionListener< return new FakeChannel(messageCaptor); } + @Override + protected void stopInternal() { + } + @Override public NodeChannels getConnection(DiscoveryNode node) { int numConnections = MockTcpTransport.LIGHT_PROFILE.getNumConnections(); @@ -237,7 +241,7 @@ public NodeChannels getConnection(DiscoveryNode node) { } } - private static final class FakeChannel implements TcpChannel { + private static final class FakeChannel implements TcpChannel, TcpServerChannel { private final AtomicReference messageCaptor; diff --git a/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java index 8831c46c01136..bbff340c86011 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java @@ -225,7 +225,7 @@ private void configureSocket(Socket socket) throws SocketException { socket.setReuseAddress(TCP_REUSE_ADDRESS.get(settings)); } - public final class MockChannel implements Closeable, TcpChannel { + public final class MockChannel implements Closeable, TcpChannel, TcpServerChannel { private final AtomicBoolean isOpen = new AtomicBoolean(true); private final InetSocketAddress localAddress; private final ServerSocket serverSocket; diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java index cb9e243660a8e..2ab8719c33422 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java @@ -41,6 +41,7 @@ import org.elasticsearch.nio.ServerChannelContext; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TcpChannel; +import org.elasticsearch.transport.TcpServerChannel; import org.elasticsearch.transport.TcpTransport; import org.elasticsearch.transport.Transports; @@ -191,7 +192,7 @@ public int consumeReads(InboundChannelBuffer channelBuffer) throws IOException { } } - private static class MockServerChannel extends NioServerSocketChannel implements TcpChannel { + private static class MockServerChannel extends NioServerSocketChannel implements TcpServerChannel { private final String profile; @@ -215,21 +216,6 @@ public String getProfile() { public void addCloseListener(ActionListener listener) { addCloseListener(ActionListener.toBiConsumer(listener)); } - - @Override - public void setSoLinger(int value) throws IOException { - throw new UnsupportedOperationException("Cannot set SO_LINGER on a server channel."); - } - - @Override - public InetSocketAddress getRemoteAddress() { - return null; - } - - @Override - public void sendMessage(BytesReference reference, ActionListener listener) { - throw new UnsupportedOperationException("Cannot send a message to a server channel."); - } } private static class MockSocketChannel extends NioSocketChannel implements TcpChannel { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/transport/netty4/SecurityNetty4Transport.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/transport/netty4/SecurityNetty4Transport.java index ce06712722cd1..b761439b15b6a 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/transport/netty4/SecurityNetty4Transport.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/transport/netty4/SecurityNetty4Transport.java @@ -109,7 +109,7 @@ protected ChannelHandler getClientChannelInitializer() { } @Override - protected void onException(TcpChannel channel, Exception e) { + public void onException(TcpChannel channel, Exception e) { if (!lifecycle.started()) { // just close and ignore - we are already stopped and just need to make sure we release all resources CloseableChannel.closeChannel(channel); diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/ServerTransportFilter.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/ServerTransportFilter.java index 161ac3678aeab..9427812ba1349 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/ServerTransportFilter.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/ServerTransportFilter.java @@ -24,7 +24,7 @@ import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportService; -import org.elasticsearch.transport.netty4.NettyTcpChannel; +import org.elasticsearch.transport.netty4.Netty4TcpChannel; import org.elasticsearch.xpack.core.security.SecurityContext; import org.elasticsearch.xpack.core.security.authc.Authentication; import org.elasticsearch.xpack.core.security.user.KibanaUser; @@ -116,8 +116,8 @@ requests from all the nodes are attached with a user (either a serialize } if (extractClientCert && (unwrappedChannel instanceof TcpTransportChannel) && - ((TcpTransportChannel) unwrappedChannel).getChannel() instanceof NettyTcpChannel) { - Channel channel = ((NettyTcpChannel) ((TcpTransportChannel) unwrappedChannel).getChannel()).getLowLevelChannel(); + ((TcpTransportChannel) unwrappedChannel).getChannel() instanceof Netty4TcpChannel) { + Channel channel = ((Netty4TcpChannel) ((TcpTransportChannel) unwrappedChannel).getChannel()).getLowLevelChannel(); SslHandler sslHandler = channel.pipeline().get(SslHandler.class); if (channel.isOpen()) { assert sslHandler != null : "channel [" + channel + "] did not have a ssl handler. pipeline " + channel.pipeline(); diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SecurityNioTransport.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SecurityNioTransport.java index 5315a944f778d..fd1b1198607d1 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SecurityNioTransport.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SecurityNioTransport.java @@ -5,7 +5,6 @@ */ package org.elasticsearch.xpack.security.transport.nio; -import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.recycler.Recycler; @@ -131,9 +130,8 @@ public NioTcpChannel createChannel(NioSelector selector, SocketChannel channel) @Override public NioTcpServerChannel createServerChannel(NioSelector selector, ServerSocketChannel channel) throws IOException { - NioTcpServerChannel nioChannel = new NioTcpServerChannel(profileName, channel); - Consumer exceptionHandler = (e) -> logger.error(() -> - new ParameterizedMessage("exception from server channel caught on transport layer [{}]", channel), e); + NioTcpServerChannel nioChannel = new NioTcpServerChannel(profileName, channel);; + Consumer exceptionHandler = (e) -> onServerException(nioChannel, e); Consumer acceptor = SecurityNioTransport.this::acceptChannel; ServerChannelContext context = new ServerChannelContext(nioChannel, this, selector, acceptor, exceptionHandler); nioChannel.setContext(context);