From aaf7f5b9267818f03025062c851cf1efebd19e26 Mon Sep 17 00:00:00 2001 From: adeneche Date: Sat, 26 Mar 2016 07:43:28 +0100 Subject: [PATCH 1/2] DRILL-3317: when ProtobufLengthDecoder couldn't allocate a new DrillBuf, this error is just logged and nothing else is done --- .../drill/exec/rpc/control/ControlClient.java | 3 +- .../control/ControlProtobufLengthDecoder.java | 5 ++- .../drill/exec/rpc/control/ControlServer.java | 5 ++- .../drill/exec/rpc/data/DataClient.java | 5 ++- .../rpc/data/DataProtobufLengthDecoder.java | 25 +++++++-------- .../drill/exec/rpc/data/DataServer.java | 15 ++------- .../drill/exec/rpc/user/UserClient.java | 3 +- .../rpc/user/UserProtobufLengthDecoder.java | 7 ++--- .../drill/exec/rpc/user/UserServer.java | 5 ++- .../apache/drill/exec/rpc/BasicServer.java | 8 ++--- .../drill/exec/rpc/OutOfMemoryHandler.java | 31 ------------------- .../drill/exec/rpc/ProtobufLengthDecoder.java | 15 ++------- 12 files changed, 31 insertions(+), 96 deletions(-) delete mode 100644 exec/rpc/src/main/java/org/apache/drill/exec/rpc/OutOfMemoryHandler.java diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java index c5bf6b5cd60..0fdf55d6506 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java @@ -27,7 +27,6 @@ import org.apache.drill.exec.proto.BitControl.RpcType; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.rpc.BasicClient; -import org.apache.drill.exec.rpc.OutOfMemoryHandler; import org.apache.drill.exec.rpc.ProtobufLengthDecoder; import org.apache.drill.exec.rpc.Response; import org.apache.drill.exec.rpc.RpcBus; @@ -111,7 +110,7 @@ public ControlConnection getConnection() { @Override public ProtobufLengthDecoder getDecoder(BufferAllocator allocator) { - return new ControlProtobufLengthDecoder(allocator, OutOfMemoryHandler.DEFAULT_INSTANCE); + return new ControlProtobufLengthDecoder(allocator); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlProtobufLengthDecoder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlProtobufLengthDecoder.java index 36573b53e39..db16e3f7cdb 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlProtobufLengthDecoder.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlProtobufLengthDecoder.java @@ -23,15 +23,14 @@ import java.util.List; import org.apache.drill.exec.memory.BufferAllocator; -import org.apache.drill.exec.rpc.OutOfMemoryHandler; import org.apache.drill.exec.rpc.ProtobufLengthDecoder; /** * Purely to simplify memory debugging. */ public class ControlProtobufLengthDecoder extends ProtobufLengthDecoder{ - public ControlProtobufLengthDecoder(BufferAllocator allocator, OutOfMemoryHandler outOfMemoryHandler) { - super(allocator, outOfMemoryHandler); + public ControlProtobufLengthDecoder(BufferAllocator allocator) { + super(allocator); } @Override diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java index a7864692935..b4a818c2862 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java @@ -26,7 +26,6 @@ import org.apache.drill.exec.proto.BitControl.BitControlHandshake; import org.apache.drill.exec.proto.BitControl.RpcType; import org.apache.drill.exec.rpc.BasicServer; -import org.apache.drill.exec.rpc.OutOfMemoryHandler; import org.apache.drill.exec.rpc.ProtobufLengthDecoder; import org.apache.drill.exec.rpc.Response; import org.apache.drill.exec.rpc.RpcException; @@ -107,8 +106,8 @@ public MessageLite getHandshakeResponse(BitControlHandshake inbound) throws Exce } @Override - public ProtobufLengthDecoder getDecoder(BufferAllocator allocator, OutOfMemoryHandler outOfMemoryHandler) { - return new ControlProtobufLengthDecoder(allocator, outOfMemoryHandler); + public ProtobufLengthDecoder getDecoder(BufferAllocator allocator) { + return new ControlProtobufLengthDecoder(allocator); } private class ProxyCloseHandler implements GenericFutureListener { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java index 9db551b4620..fae996c366d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java @@ -28,7 +28,6 @@ import org.apache.drill.exec.proto.BitData.RpcType; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.rpc.BasicClient; -import org.apache.drill.exec.rpc.OutOfMemoryHandler; import org.apache.drill.exec.rpc.ProtobufLengthDecoder; import org.apache.drill.exec.rpc.Response; import org.apache.drill.exec.rpc.RpcException; @@ -38,7 +37,7 @@ public class DataClient extends BasicClient{ - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DataClient.class); +// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DataClient.class); private volatile DataClientConnection connection; private final BufferAllocator allocator; @@ -100,6 +99,6 @@ public DataClientConnection getConnection() { @Override public ProtobufLengthDecoder getDecoder(BufferAllocator allocator) { - return new DataProtobufLengthDecoder.Client(allocator, OutOfMemoryHandler.DEFAULT_INSTANCE); + return new DataProtobufLengthDecoder.Client(allocator); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataProtobufLengthDecoder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataProtobufLengthDecoder.java index a74a5a7e250..3a5b816666c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataProtobufLengthDecoder.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataProtobufLengthDecoder.java @@ -23,27 +23,26 @@ import java.util.List; import org.apache.drill.exec.memory.BufferAllocator; -import org.apache.drill.exec.rpc.OutOfMemoryHandler; import org.apache.drill.exec.rpc.ProtobufLengthDecoder; -public class DataProtobufLengthDecoder{ +public class DataProtobufLengthDecoder { - public static class Client extends ProtobufLengthDecoder{ - public Client(BufferAllocator allocator, OutOfMemoryHandler outOfMemoryHandler) { - super(allocator, outOfMemoryHandler); + public static class Client extends ProtobufLengthDecoder { + public Client(BufferAllocator allocator) { + super(allocator); - } + } - @Override - protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { - super.decode(ctx, in, out); - } + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { + super.decode(ctx, in, out); + } } - public static class Server extends ProtobufLengthDecoder{ + public static class Server extends ProtobufLengthDecoder { - public Server(BufferAllocator allocator, OutOfMemoryHandler outOfMemoryHandler) { - super(allocator, outOfMemoryHandler); + public Server(BufferAllocator allocator) { + super(allocator); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java index 03118d754b9..e186136c47b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java @@ -37,7 +37,6 @@ import org.apache.drill.exec.proto.helper.QueryIdHelper; import org.apache.drill.exec.rpc.Acks; import org.apache.drill.exec.rpc.BasicServer; -import org.apache.drill.exec.rpc.OutOfMemoryHandler; import org.apache.drill.exec.rpc.ProtobufLengthDecoder; import org.apache.drill.exec.rpc.Response; import org.apache.drill.exec.rpc.ResponseSender; @@ -191,18 +190,8 @@ public void operationComplete(ChannelFuture future) throws Exception { } @Override - public OutOfMemoryHandler getOutOfMemoryHandler() { - return new OutOfMemoryHandler() { - @Override - public void handle() { - logger.error("Out of memory in RPC layer."); - } - }; - } - - @Override - public ProtobufLengthDecoder getDecoder(BufferAllocator allocator, OutOfMemoryHandler outOfMemoryHandler) { - return new DataProtobufLengthDecoder.Server(allocator, outOfMemoryHandler); + public ProtobufLengthDecoder getDecoder(BufferAllocator allocator) { + return new DataProtobufLengthDecoder.Server(allocator); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java index 824e6eb63f9..784084b2003 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java @@ -39,7 +39,6 @@ import org.apache.drill.exec.rpc.Acks; import org.apache.drill.exec.rpc.BasicClientWithConnection; import org.apache.drill.exec.rpc.ConnectionThrottle; -import org.apache.drill.exec.rpc.OutOfMemoryHandler; import org.apache.drill.exec.rpc.ProtobufLengthDecoder; import org.apache.drill.exec.rpc.Response; import org.apache.drill.exec.rpc.RpcConnectionHandler; @@ -134,6 +133,6 @@ protected void finalizeConnection(BitToUserHandshake handshake, BasicClientWithC @Override public ProtobufLengthDecoder getDecoder(BufferAllocator allocator) { - return new UserProtobufLengthDecoder(allocator, OutOfMemoryHandler.DEFAULT_INSTANCE); + return new UserProtobufLengthDecoder(allocator); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserProtobufLengthDecoder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserProtobufLengthDecoder.java index 266f112f8ca..fc443610a57 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserProtobufLengthDecoder.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserProtobufLengthDecoder.java @@ -23,13 +23,12 @@ import java.util.List; import org.apache.drill.exec.memory.BufferAllocator; -import org.apache.drill.exec.rpc.OutOfMemoryHandler; import org.apache.drill.exec.rpc.ProtobufLengthDecoder; -public class UserProtobufLengthDecoder extends ProtobufLengthDecoder{ +public class UserProtobufLengthDecoder extends ProtobufLengthDecoder { - public UserProtobufLengthDecoder(BufferAllocator allocator, OutOfMemoryHandler outOfMemoryHandler) { - super(allocator, outOfMemoryHandler); + public UserProtobufLengthDecoder(BufferAllocator allocator) { + super(allocator); } @Override diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java index 7e90747bb9f..a28c801e66f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java @@ -45,7 +45,6 @@ import org.apache.drill.exec.proto.UserProtos.UserProperties; import org.apache.drill.exec.proto.UserProtos.UserToBitHandshake; import org.apache.drill.exec.rpc.BasicServer; -import org.apache.drill.exec.rpc.OutOfMemoryHandler; import org.apache.drill.exec.rpc.OutboundRpcMessage; import org.apache.drill.exec.rpc.ProtobufLengthDecoder; import org.apache.drill.exec.rpc.RemoteConnection; @@ -287,8 +286,8 @@ private static BitToUserHandshake handleFailure(BitToUserHandshake.Builder respB } @Override - public ProtobufLengthDecoder getDecoder(BufferAllocator allocator, OutOfMemoryHandler outOfMemoryHandler) { - return new UserProtobufLengthDecoder(allocator, outOfMemoryHandler); + public ProtobufLengthDecoder getDecoder(BufferAllocator allocator) { + return new UserProtobufLengthDecoder(allocator); } @Override diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicServer.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicServer.java index 27364afb852..567542679c6 100644 --- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicServer.java +++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicServer.java @@ -81,7 +81,7 @@ protected void initChannel(SocketChannel ch) throws Exception { ch.closeFuture().addListener(getCloseHandler(ch, connection)); final ChannelPipeline pipe = ch.pipeline(); - pipe.addLast("protocol-decoder", getDecoder(connection.getAllocator(), getOutOfMemoryHandler())); + pipe.addLast("protocol-decoder", getDecoder(connection.getAllocator())); pipe.addLast("message-decoder", new RpcDecoder("s-" + rpcConfig.getName())); pipe.addLast("protocol-encoder", new RpcEncoder("s-" + rpcConfig.getName())); pipe.addLast("handshake-handler", getHandshakeHandler(connection)); @@ -123,15 +123,11 @@ protected void readTimedOut(ChannelHandlerContext ctx) throws Exception { } - public OutOfMemoryHandler getOutOfMemoryHandler() { - return OutOfMemoryHandler.DEFAULT_INSTANCE; - } - protected void removeTimeoutHandler() { } - public abstract ProtobufLengthDecoder getDecoder(BufferAllocator allocator, OutOfMemoryHandler outOfMemoryHandler); + public abstract ProtobufLengthDecoder getDecoder(BufferAllocator allocator); @Override public boolean isClient() { diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/OutOfMemoryHandler.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/OutOfMemoryHandler.java deleted file mode 100644 index 5d7db478e36..00000000000 --- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/OutOfMemoryHandler.java +++ /dev/null @@ -1,31 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.rpc; - -public interface OutOfMemoryHandler { - - public static OutOfMemoryHandler DEFAULT_INSTANCE = new OutOfMemoryHandler() { - @Override - public void handle() { - throw new UnsupportedOperationException(); - } - }; - - public void handle(); - -} diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ProtobufLengthDecoder.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ProtobufLengthDecoder.java index 3dfe03f5eab..33af557bea6 100644 --- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ProtobufLengthDecoder.java +++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ProtobufLengthDecoder.java @@ -27,7 +27,6 @@ import org.apache.drill.exec.memory.BufferAllocator; import com.google.protobuf.CodedInputStream; -import org.apache.drill.exec.exception.OutOfMemoryException; /** * Modified version of {@link io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder} that avoids bytebuf copy. @@ -37,12 +36,10 @@ public class ProtobufLengthDecoder extends ByteToMessageDecoder { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProtobufLengthDecoder.class); private BufferAllocator allocator; - private OutOfMemoryHandler outOfMemoryHandler; - public ProtobufLengthDecoder(BufferAllocator allocator, OutOfMemoryHandler outOfMemoryHandler) { + public ProtobufLengthDecoder(BufferAllocator allocator) { super(); this.allocator = allocator; - this.outOfMemoryHandler = outOfMemoryHandler; } @@ -82,15 +79,7 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) t } else { // need to make buffer copy, otherwise netty will try to refill this buffer if we move the readerIndex forward... // TODO: Can we avoid this copy? - ByteBuf outBuf; - try { - outBuf = allocator.buffer(length); - } catch (OutOfMemoryException e) { - logger.warn("Failure allocating buffer on incoming stream due to memory limits. Current Allocation: {}.", allocator.getAllocatedMemory()); - in.resetReaderIndex(); - outOfMemoryHandler.handle(); - return; - } + ByteBuf outBuf = allocator.buffer(length); outBuf.writeBytes(in, in.readerIndex(), length); in.skipBytes(length); From 46ed7716b8496ba4b1cc5ac2473a35c7e2738abd Mon Sep 17 00:00:00 2001 From: adeneche Date: Wed, 20 Apr 2016 15:52:03 -0700 Subject: [PATCH 2/2] use rpc allocator instead of root allocator --- .../org/apache/drill/exec/rpc/control/ControlServer.java | 9 +++++---- .../apache/drill/exec/rpc/control/ControllerImpl.java | 4 +++- .../java/org/apache/drill/exec/rpc/data/DataClient.java | 9 +++++---- .../drill/exec/rpc/data/DataConnectionCreator.java | 2 +- .../drill/exec/rpc/data/DataConnectionManager.java | 7 +++++-- .../java/org/apache/drill/exec/rpc/data/DataServer.java | 9 ++++----- .../java/org/apache/drill/exec/server/TestBitRpc.java | 7 ++----- 7 files changed, 25 insertions(+), 22 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java index b4a818c2862..d39425352a8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java @@ -40,16 +40,17 @@ public class ControlServer extends BasicServer{ private final ControlMessageHandler handler; private final ConnectionManagerRegistry connectionRegistry; private volatile ProxyCloseHandler proxyCloseHandler; - private BufferAllocator allocator; + private BufferAllocator controlAllocator; - public ControlServer(ControlMessageHandler handler, BootStrapContext context, ConnectionManagerRegistry connectionRegistry) { + public ControlServer(ControlMessageHandler handler, BootStrapContext context, + ConnectionManagerRegistry connectionRegistry, BufferAllocator allocator) { super( ControlRpcConfig.getMapping(context.getConfig(), context.getExecutor()), context.getAllocator().getAsByteBufAllocator(), context.getBitLoopGroup()); this.handler = handler; this.connectionRegistry = connectionRegistry; - this.allocator = context.getAllocator(); + this.controlAllocator = allocator; } @Override @@ -71,7 +72,7 @@ protected GenericFutureListener getCloseHandler(SocketChannel ch, @Override public ControlConnection initRemoteConnection(SocketChannel channel) { super.initRemoteConnection(channel); - return new ControlConnection("control server", channel, this, allocator); + return new ControlConnection("control server", channel, this, controlAllocator); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControllerImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControllerImpl.java index 482f1174f9e..cb7b238a921 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControllerImpl.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControllerImpl.java @@ -44,6 +44,7 @@ public class ControllerImpl implements Controller { private final ConnectionManagerRegistry connectionRegistry; private final boolean allowPortHunting; private final CustomHandlerRegistry handlerRegistry; + private final BufferAllocator controlAllocator; public ControllerImpl(BootStrapContext context, ControlMessageHandler handler, BufferAllocator allocator, boolean allowPortHunting) { @@ -53,11 +54,12 @@ public ControllerImpl(BootStrapContext context, ControlMessageHandler handler, B this.connectionRegistry = new ConnectionManagerRegistry(allocator, handler, context); this.allowPortHunting = allowPortHunting; this.handlerRegistry = handler.getHandlerRegistry(); + controlAllocator = allocator; } @Override public DrillbitEndpoint start(DrillbitEndpoint partialEndpoint) throws DrillbitStartupException { - server = new ControlServer(handler, context, connectionRegistry); + server = new ControlServer(handler, context, connectionRegistry, controlAllocator); int port = context.getConfig().getInt(ExecConstants.INITIAL_BIT_PORT); port = server.bind(port, allowPortHunting); DrillbitEndpoint completeEndpoint = partialEndpoint.toBuilder().setControlPort(port).build(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java index fae996c366d..2d556089123 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java @@ -40,11 +40,12 @@ public class DataClient extends BasicClient { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DataServer.class); private volatile ProxyCloseHandler proxyCloseHandler; - private final BootStrapContext context; private final WorkEventBus workBus; private final WorkerBee bee; + private final BufferAllocator dataAllocator; - public DataServer(BootStrapContext context, BufferAllocator alloc, WorkEventBus workBus, - WorkerBee bee) { + public DataServer(BootStrapContext context, BufferAllocator alloc, WorkEventBus workBus, WorkerBee bee) { super( DataRpcConfig.getMapping(context.getConfig(), context.getExecutor()), alloc.getAsByteBufAllocator(), context.getBitLoopGroup()); - this.context = context; this.workBus = workBus; this.bee = bee; + this.dataAllocator = alloc; } @Override @@ -81,7 +80,7 @@ protected GenericFutureListener getCloseHandler(SocketChannel ch, @Override public BitServerConnection initRemoteConnection(SocketChannel channel) { super.initRemoteConnection(channel); - return new BitServerConnection(channel, context.getAllocator()); + return new BitServerConnection(channel, dataAllocator); } @Override diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestBitRpc.java b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestBitRpc.java index 10656f437bc..17c70662989 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestBitRpc.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestBitRpc.java @@ -31,8 +31,6 @@ import mockit.NonStrictExpectations; import org.apache.drill.common.config.DrillConfig; -import org.apache.drill.common.expression.ExpressionPosition; -import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.scanner.ClassPathScanner; import org.apache.drill.common.types.TypeProtos.MinorType; import org.apache.drill.common.types.Types; @@ -66,7 +64,7 @@ import com.google.common.collect.Lists; public class TestBitRpc extends ExecTest { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestBitRpc.class); +// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestBitRpc.class); @Test public void testConnectionBackpressure(@Injectable WorkerBee bee, @Injectable final WorkEventBus workBus) throws Exception { @@ -121,7 +119,7 @@ public FragmentContext getFragmentContext(){ port = server.bind(port, true); DrillbitEndpoint ep = DrillbitEndpoint.newBuilder().setAddress("localhost").setDataPort(port).build(); - DataConnectionManager manager = new DataConnectionManager(ep, c2); + DataConnectionManager manager = new DataConnectionManager(ep, c2, c.getAllocator()); DataTunnel tunnel = new DataTunnel(manager); AtomicLong max = new AtomicLong(0); for (int i = 0; i < 40; i++) { @@ -129,7 +127,6 @@ public FragmentContext getFragmentContext(){ tunnel.sendRecordBatch(new TimingOutcome(max), new FragmentWritableBatch(false, QueryId.getDefaultInstance(), 1, 1, 1, 1, getRandomBatch(c.getAllocator(), 5000))); System.out.println(System.currentTimeMillis() - t1); - // System.out.println("sent."); } System.out.println(String.format("Max time: %d", max.get())); assertTrue(max.get() > 2700);