Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
3dc8eb1
WIP
Tim-Brooks Nov 8, 2018
8a5f887
WIP
Tim-Brooks Nov 9, 2018
f60d26a
Merge remote-tracking branch 'upstream/master' into keep_alive_changes
Tim-Brooks Nov 9, 2018
29cd642
WIP
Tim-Brooks Nov 10, 2018
fe707d8
WIP
Tim-Brooks Nov 10, 2018
67ec991
WIP
Tim-Brooks Nov 11, 2018
a234ae1
WIP
Tim-Brooks Nov 12, 2018
d2e43ee
Finish unit test
Tim-Brooks Nov 12, 2018
2ae773a
WIP
Tim-Brooks Nov 12, 2018
a3cefcc
WIP
Tim-Brooks Nov 12, 2018
57f371e
WIP
Tim-Brooks Nov 12, 2018
ead5531
WIP
Tim-Brooks Nov 12, 2018
3196aae
Fix issues
Tim-Brooks Nov 12, 2018
1761b75
Fix tests
Tim-Brooks Nov 12, 2018
df93fc5
Merge remote-tracking branch 'upstream/master' into keep_alive_changes
Tim-Brooks Nov 12, 2018
63c65a4
Close keep alive
Tim-Brooks Nov 12, 2018
b66f81a
Merge remote-tracking branch 'upstream/master' into keep_alive_changes
Tim-Brooks Nov 12, 2018
5e3a156
Merge remote-tracking branch 'upstream/master' into keep_alive_changes
Tim-Brooks Nov 12, 2018
8bab5a9
Merge remote-tracking branch 'upstream/master' into keep_alive_changes
Tim-Brooks Nov 13, 2018
8564eee
Changes from review
Tim-Brooks Nov 13, 2018
53da572
Merge remote-tracking branch 'upstream/master' into keep_alive_changes
Tim-Brooks Nov 13, 2018
d7e920f
Add async bi consumer
Tim-Brooks Nov 15, 2018
55ba446
Merge remote-tracking branch 'upstream/master' into keep_alive_changes
Tim-Brooks Nov 15, 2018
3320bbe
Changes from review
Tim-Brooks Nov 19, 2018
ac7cdae
checkstyle
Tim-Brooks Nov 19, 2018
135f74a
Remove assertion
Tim-Brooks Nov 19, 2018
8a250c6
Merge remote-tracking branch 'upstream/master' into keep_alive_changes
Tim-Brooks Nov 20, 2018
cdd1ec3
Merge remote-tracking branch 'upstream/master' into keep_alive_changes
Tim-Brooks Nov 20, 2018
01b74dc
Merge remote-tracking branch 'upstream/master' into keep_alive_changes
Tim-Brooks Nov 28, 2018
11a7167
Changes from review
Tim-Brooks Nov 28, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,15 @@
public class Netty4TcpChannel implements TcpChannel {

private final Channel channel;
private final boolean isServer;
private final String profile;
private final CompletableContext<Void> connectContext;
private final CompletableContext<Void> closeContext = new CompletableContext<>();
private final ChannelStats stats = new ChannelStats();

Netty4TcpChannel(Channel channel, String profile, @Nullable ChannelFuture connectFuture) {
Netty4TcpChannel(Channel channel, boolean isServer, String profile, @Nullable ChannelFuture connectFuture) {
this.channel = channel;
this.isServer = isServer;
this.profile = profile;
this.connectContext = new CompletableContext<>();
this.channel.closeFuture().addListener(f -> {
Expand Down Expand Up @@ -77,6 +80,11 @@ public void close() {
channel.close();
}

@Override
public boolean isServerChannel() {
return isServer;
}

@Override
public String getProfile() {
return profile;
Expand All @@ -92,6 +100,11 @@ public void addConnectListener(ActionListener<Void> listener) {
connectContext.addListener(ActionListener.toBiConsumer(listener));
}

@Override
public ChannelStats getChannelStats() {
return stats;
}

@Override
public boolean isOpen() {
return channel.isOpen();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ protected Netty4TcpChannel initiateChannel(DiscoveryNode node) throws IOExceptio
}
addClosedExceptionLogger(channel);

Netty4TcpChannel nettyChannel = new Netty4TcpChannel(channel, "default", connectFuture);
Netty4TcpChannel nettyChannel = new Netty4TcpChannel(channel, false, "default", connectFuture);
channel.attr(CHANNEL_KEY).set(nettyChannel);

return nettyChannel;
Expand All @@ -246,14 +246,6 @@ protected Netty4TcpServerChannel bind(String name, InetSocketAddress address) {
return esChannel;
}

long successfulPingCount() {
return successfulPings.count();
}

long failedPingCount() {
return failedPings.count();
}

@Override
@SuppressForbidden(reason = "debug")
protected void stopInternal() {
Expand Down Expand Up @@ -297,7 +289,7 @@ protected ServerChannelInitializer(String name) {
@Override
protected void initChannel(Channel ch) throws Exception {
addClosedExceptionLogger(ch);
Netty4TcpChannel nettyTcpChannel = new Netty4TcpChannel(ch, name, ch.newSucceededFuture());
Netty4TcpChannel nettyTcpChannel = new Netty4TcpChannel(ch, true, name, ch.newSucceededFuture());
ch.attr(CHANNEL_KEY).set(nettyTcpChannel);
ch.pipeline().addLast("logging", new ESLoggingHandler());
ch.pipeline().addLast("size", new Netty4SizeHeaderFrameDecoder());
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,25 @@

public class NioTcpChannel extends NioSocketChannel implements TcpChannel {

private final boolean isServer;
private final String profile;
private final ChannelStats stats = new ChannelStats();

public NioTcpChannel(String profile, SocketChannel socketChannel) {
public NioTcpChannel(boolean isServer, String profile, SocketChannel socketChannel) {
super(socketChannel);
this.isServer = isServer;
this.profile = profile;
}

public void sendMessage(BytesReference reference, ActionListener<Void> listener) {
getContext().sendMessage(BytesReference.toByteBuffers(reference), ActionListener.toBiConsumer(listener));
}

@Override
public boolean isServerChannel() {
return isServer;
}

@Override
public String getProfile() {
return profile;
Expand All @@ -54,6 +62,11 @@ public void addConnectListener(ActionListener<Void> listener) {
addConnectListener(ActionListener.toBiConsumer(listener));
}

@Override
public ChannelStats getChannelStats() {
return stats;
}

@Override
public void close() {
getContext().closeChannel();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory;

public class NioTransport extends TcpTransport {

private static final Logger logger = LogManager.getLogger(NioTransport.class);

public static final Setting<Integer> NIO_WORKER_COUNT =
Expand Down Expand Up @@ -135,11 +136,11 @@ protected void acceptChannel(NioSocketChannel channel) {
}

protected TcpChannelFactory serverChannelFactory(ProfileSettings profileSettings) {
return new TcpChannelFactoryImpl(profileSettings);
return new TcpChannelFactoryImpl(profileSettings, false);
}

protected Function<DiscoveryNode, TcpChannelFactory> clientChannelFactoryFunction(ProfileSettings profileSettings) {
return (n) -> new TcpChannelFactoryImpl(profileSettings);
return (n) -> new TcpChannelFactoryImpl(profileSettings, true);
}

protected abstract class TcpChannelFactory extends ChannelFactory<NioTcpServerChannel, NioTcpChannel> {
Expand All @@ -151,20 +152,22 @@ protected TcpChannelFactory(RawChannelFactory rawChannelFactory) {

private class TcpChannelFactoryImpl extends TcpChannelFactory {

private final boolean isClient;
private final String profileName;

private TcpChannelFactoryImpl(ProfileSettings profileSettings) {
private TcpChannelFactoryImpl(ProfileSettings profileSettings, boolean isClient) {
super(new RawChannelFactory(profileSettings.tcpNoDelay,
profileSettings.tcpKeepAlive,
profileSettings.reuseAddress,
Math.toIntExact(profileSettings.sendBufferSize.getBytes()),
Math.toIntExact(profileSettings.receiveBufferSize.getBytes())));
this.isClient = isClient;
this.profileName = profileSettings.profileName;
}

@Override
public NioTcpChannel createChannel(NioSelector selector, SocketChannel channel) throws IOException {
NioTcpChannel nioChannel = new NioTcpChannel(profileName, channel);
public NioTcpChannel createChannel(NioSelector selector, SocketChannel channel) {
NioTcpChannel nioChannel = new NioTcpChannel(isClient == false, profileName, channel);
Supplier<InboundChannelBuffer.Page> pageSupplier = () -> {
Recycler.V<byte[]> bytes = pageCacheRecycler.bytePage(false);
return new InboundChannelBuffer.Page(ByteBuffer.wrap(bytes.v()), bytes::close);
Expand All @@ -178,7 +181,7 @@ public NioTcpChannel createChannel(NioSelector selector, SocketChannel channel)
}

@Override
public NioTcpServerChannel createServerChannel(NioSelector selector, ServerSocketChannel channel) throws IOException {
public NioTcpServerChannel createServerChannel(NioSelector selector, ServerSocketChannel channel) {
NioTcpServerChannel nioChannel = new NioTcpServerChannel(profileName, channel);
Consumer<Exception> exceptionHandler = (e) -> onServerException(nioChannel, e);
Consumer<NioSocketChannel> acceptor = NioTransport.this::acceptChannel;
Expand Down
29 changes: 29 additions & 0 deletions server/src/main/java/org/elasticsearch/common/AsyncBiFunction.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common;

import org.elasticsearch.action.ActionListener;

/**
* A {@link java.util.function.BiFunction}-like interface designed to be used with asynchronous executions.
*/
public interface AsyncBiFunction<T,U,C> {

void apply(T t, U u, ActionListener<C> listener);
}
Loading