From 107c17c5cceeb7c22fb02eafe2e77bc9304d0d37 Mon Sep 17 00:00:00 2001 From: Josh Elser Date: Fri, 8 Jan 2016 00:49:44 -0500 Subject: [PATCH] ACCUMULO-4095 Hacks on CustomNonBlockingServer to restore client address functionality. --- .../server/rpc/CustomNonBlockingServer.java | 57 ---------- .../accumulo/server/rpc/TServerUtils.java | 1 + .../server/CustomNonBlockingServer.java | 103 ++++++++++++++++++ 3 files changed, 104 insertions(+), 57 deletions(-) delete mode 100644 server/base/src/main/java/org/apache/accumulo/server/rpc/CustomNonBlockingServer.java create mode 100644 server/base/src/main/java/org/apache/thrift/server/CustomNonBlockingServer.java diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/CustomNonBlockingServer.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/CustomNonBlockingServer.java deleted file mode 100644 index f4737be296c..00000000000 --- a/server/base/src/main/java/org/apache/accumulo/server/rpc/CustomNonBlockingServer.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.server.rpc; - -import java.net.Socket; -import java.nio.channels.SelectionKey; - -import org.apache.thrift.server.THsHaServer; -import org.apache.thrift.transport.TNonblockingSocket; -import org.apache.thrift.transport.TNonblockingTransport; - -/** - * This class implements a custom non-blocking thrift server that stores the client address in thread-local storage for the invocation. - * - */ -public class CustomNonBlockingServer extends THsHaServer { - - public CustomNonBlockingServer(Args args) { - super(args); - } - - protected FrameBuffer createFrameBuffer(final TNonblockingTransport trans, final SelectionKey selectionKey, final AbstractSelectThread selectThread) { - return new CustomAsyncFrameBuffer(trans, selectionKey, selectThread); - } - - private class CustomAsyncFrameBuffer extends AsyncFrameBuffer { - - public CustomAsyncFrameBuffer(TNonblockingTransport trans, SelectionKey selectionKey, AbstractSelectThread selectThread) { - super(trans, selectionKey, selectThread); - } - - @Override - public void invoke() { - if (trans_ instanceof TNonblockingSocket) { - TNonblockingSocket tsock = (TNonblockingSocket) trans_; - Socket sock = tsock.getSocketChannel().socket(); - TServerUtils.clientAddress.set(sock.getInetAddress().getHostAddress() + ":" + sock.getPort()); - } - super.invoke(); - } - } - -} diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java index f474dc354f3..9b2e43a0d79 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java +++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java @@ -51,6 +51,7 @@ import org.apache.thrift.TProcessor; import org.apache.thrift.TProcessorFactory; import org.apache.thrift.protocol.TProtocolFactory; +import org.apache.thrift.server.CustomNonBlockingServer; import org.apache.thrift.server.TServer; import org.apache.thrift.server.TThreadPoolServer; import org.apache.thrift.transport.TNonblockingServerSocket; diff --git a/server/base/src/main/java/org/apache/thrift/server/CustomNonBlockingServer.java b/server/base/src/main/java/org/apache/thrift/server/CustomNonBlockingServer.java new file mode 100644 index 00000000000..ba3d2bf3c11 --- /dev/null +++ b/server/base/src/main/java/org/apache/thrift/server/CustomNonBlockingServer.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.thrift.server; + +import java.io.IOException; +import java.lang.reflect.Field; +import java.net.Socket; +import java.nio.channels.SelectionKey; + +import org.apache.accumulo.server.rpc.TServerUtils; +import org.apache.thrift.transport.TNonblockingServerTransport; +import org.apache.thrift.transport.TNonblockingSocket; +import org.apache.thrift.transport.TNonblockingTransport; + +/** + * This class implements a custom non-blocking thrift server that stores the client address in thread-local storage for the invocation. + * + */ +public class CustomNonBlockingServer extends THsHaServer { + + private final Field selectAcceptThreadField; + + public CustomNonBlockingServer(Args args) { + super(args); + + try { + selectAcceptThreadField = TNonblockingServer.class.getDeclaredField("selectAcceptThread_"); + selectAcceptThreadField.setAccessible(true); + } catch (Exception e) { + throw new RuntimeException("Failed to access required field in Thrift code.", e); + } + } + + @Override + protected boolean startThreads() { + // Yet another dirty/gross hack to get access to the client's address. + + // start the selector + try { + // Hack in our SelectAcceptThread impl + SelectAcceptThread selectAcceptThread_ = new CustomerSelectAcceptThread((TNonblockingServerTransport)serverTransport_); + // Set the private field before continuing. + selectAcceptThreadField.set(this, selectAcceptThread_); + + selectAcceptThread_.start(); + return true; + } catch (IOException e) { + LOGGER.error("Failed to start selector thread!", e); + return false; + } catch (IllegalAccessException | IllegalArgumentException e) { + throw new RuntimeException("Exception setting customer select thread in Thrift"); + } + } + + + private class CustomerSelectAcceptThread extends SelectAcceptThread { + + public CustomerSelectAcceptThread(TNonblockingServerTransport serverTransport) throws IOException { + super(serverTransport); + } + + @Override + protected FrameBuffer createFrameBuffer(final TNonblockingTransport trans, final SelectionKey selectionKey, final AbstractSelectThread selectThread) { + if (processorFactory_.isAsyncProcessor()) { + throw new IllegalStateException("This implementation does not support AsyncProcessors"); + } + + return new CustomFrameBuffer(trans, selectionKey, selectThread); + } + } + + private class CustomFrameBuffer extends FrameBuffer { + + public CustomFrameBuffer(TNonblockingTransport trans, SelectionKey selectionKey, AbstractSelectThread selectThread) { + super(trans, selectionKey, selectThread); + } + + @Override + public void invoke() { + if (trans_ instanceof TNonblockingSocket) { + TNonblockingSocket tsock = (TNonblockingSocket) trans_; + Socket sock = tsock.getSocketChannel().socket(); + TServerUtils.clientAddress.set(sock.getInetAddress().getHostAddress() + ":" + sock.getPort()); + } + super.invoke(); + } + } + +}