Skip to content

Commit

Permalink
HBASE-16414 Improve performance for RPC encryption with Apache Common
Browse files Browse the repository at this point in the history
Crypto (Colin Ma)
  • Loading branch information
Ramkrishna committed Oct 21, 2016
1 parent d3decaa commit 0ae211e
Show file tree
Hide file tree
Showing 19 changed files with 3,666 additions and 831 deletions.
Expand Up @@ -60,12 +60,14 @@
import org.apache.hadoop.hbase.io.ByteArrayOutputStream;
import org.apache.hadoop.hbase.ipc.HBaseRpcController.CancellationCallback;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ResponseHeader;
import org.apache.hadoop.hbase.security.HBaseSaslRpcClient;
import org.apache.hadoop.hbase.security.SaslUtil;
import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.ExceptionUtil;
Expand Down Expand Up @@ -111,6 +113,8 @@ class BlockingRpcConnection extends RpcConnection implements Runnable {

private byte[] connectionHeaderWithLength;

private boolean waitingConnectionHeaderResponse = false;

/**
* If the client wants to interrupt its calls easily (i.e. call Thread#interrupt), it gets into a
* java issue: an interruption during a write closes the socket/channel. A way to avoid this is to
Expand Down Expand Up @@ -349,7 +353,8 @@ private boolean setupSaslConnection(final InputStream in2, final OutputStream ou
throws IOException {
saslRpcClient = new HBaseSaslRpcClient(authMethod, token, serverPrincipal,
this.rpcClient.fallbackAllowed, this.rpcClient.conf.get("hbase.rpc.protection",
QualityOfProtection.AUTHENTICATION.name().toLowerCase(Locale.ROOT)));
QualityOfProtection.AUTHENTICATION.name().toLowerCase(Locale.ROOT)),
this.rpcClient.conf.getBoolean(CRYPTO_AES_ENABLED_KEY, CRYPTO_AES_ENABLED_DEFAULT));
return saslRpcClient.saslConnect(in2, out2);
}

Expand Down Expand Up @@ -462,8 +467,8 @@ public Boolean run() throws IOException {
}
if (continueSasl) {
// Sasl connect is successful. Let's set up Sasl i/o streams.
inStream = saslRpcClient.getInputStream(inStream);
outStream = saslRpcClient.getOutputStream(outStream);
inStream = saslRpcClient.getInputStream();
outStream = saslRpcClient.getOutputStream();
} else {
// fall back to simple auth because server told us so.
// do not change authMethod and useSasl here, we should start from secure when
Expand All @@ -474,6 +479,9 @@ public Boolean run() throws IOException {
this.out = new DataOutputStream(new BufferedOutputStream(outStream));
// Now write out the connection header
writeConnectionHeader();
// process the response from server for connection header if necessary
processResponseForConnectionHeader();

break;
}
} catch (Throwable t) {
Expand Down Expand Up @@ -511,10 +519,60 @@ private void writeConnectionHeaderPreamble(OutputStream out) throws IOException
* Write the connection header.
*/
private void writeConnectionHeader() throws IOException {
boolean isCryptoAesEnable = false;
// check if Crypto AES is enabled
if (saslRpcClient != null) {
boolean saslEncryptionEnabled = SaslUtil.QualityOfProtection.PRIVACY.
getSaslQop().equalsIgnoreCase(saslRpcClient.getSaslQOP());
isCryptoAesEnable = saslEncryptionEnabled && conf.getBoolean(
CRYPTO_AES_ENABLED_KEY, CRYPTO_AES_ENABLED_DEFAULT);
}

// if Crypto AES is enabled, set transformation and negotiate with server
if (isCryptoAesEnable) {
waitingConnectionHeaderResponse = true;
}
this.out.write(connectionHeaderWithLength);
this.out.flush();
}

private void processResponseForConnectionHeader() throws IOException {
// if no response excepted, return
if (!waitingConnectionHeaderResponse) return;
try {
// read the ConnectionHeaderResponse from server
int len = this.in.readInt();
byte[] buff = new byte[len];
int readSize = this.in.read(buff);
if (LOG.isDebugEnabled()) {
LOG.debug("Length of response for connection header:" + readSize);
}

RPCProtos.ConnectionHeaderResponse connectionHeaderResponse =
RPCProtos.ConnectionHeaderResponse.parseFrom(buff);

// Get the CryptoCipherMeta, update the HBaseSaslRpcClient for Crypto Cipher
if (connectionHeaderResponse.hasCryptoCipherMeta()) {
negotiateCryptoAes(connectionHeaderResponse.getCryptoCipherMeta());
}
waitingConnectionHeaderResponse = false;
} catch (SocketTimeoutException ste) {
LOG.fatal("Can't get the connection header response for rpc timeout, please check if" +
" server has the correct configuration to support the additional function.", ste);
// timeout when waiting the connection header response, ignore the additional function
throw new IOException("Timeout while waiting connection header response", ste);
}
}

private void negotiateCryptoAes(RPCProtos.CryptoCipherMeta cryptoCipherMeta)
throws IOException {
// initilize the Crypto AES with CryptoCipherMeta
saslRpcClient.initCryptoCipher(cryptoCipherMeta, this.rpcClient.conf);
// reset the inputStream/outputStream for Crypto AES encryption
this.in = new DataInputStream(new BufferedInputStream(saslRpcClient.getInputStream()));
this.out = new DataOutputStream(new BufferedOutputStream(saslRpcClient.getOutputStream()));
}

private void tracedWriteRequest(Call call) throws IOException {
try (TraceScope ignored = Trace.startSpan("RpcClientImpl.tracedWriteRequest", call.span)) {
writeRequest(call);
Expand Down
Expand Up @@ -22,6 +22,8 @@
import static org.apache.hadoop.hbase.ipc.IPCUtil.setCancelled;
import static org.apache.hadoop.hbase.ipc.IPCUtil.toIOE;

import io.netty.handler.timeout.ReadTimeoutHandler;
import org.apache.hadoop.hbase.security.NettyHBaseRpcConnectionHeaderHandler;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcCallback;

import io.netty.bootstrap.Bootstrap;
Expand Down Expand Up @@ -55,7 +57,6 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader;
import org.apache.hadoop.hbase.security.NettyHBaseSaslRpcClientHandler;
import org.apache.hadoop.hbase.security.SaslChallengeDecoder;
import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.security.UserGroupInformation;

Expand Down Expand Up @@ -130,8 +131,7 @@ public synchronized void cleanupConnection() {
}
}

private void established(Channel ch) {
ch.write(connectionHeaderWithLength.retainedDuplicate());
private void established(Channel ch) throws IOException {
ChannelPipeline p = ch.pipeline();
String addBeforeHandler = p.context(BufferCallBeforeInitHandler.class).name();
p.addBefore(addBeforeHandler, null,
Expand Down Expand Up @@ -188,11 +188,10 @@ private void saslNegotiate(final Channel ch) {
return;
}
Promise<Boolean> saslPromise = ch.eventLoop().newPromise();
ChannelHandler saslHandler;
final NettyHBaseSaslRpcClientHandler saslHandler;
try {
saslHandler = new NettyHBaseSaslRpcClientHandler(saslPromise, ticket, authMethod, token,
serverPrincipal, rpcClient.fallbackAllowed, this.rpcClient.conf.get(
"hbase.rpc.protection", QualityOfProtection.AUTHENTICATION.name().toLowerCase()));
serverPrincipal, rpcClient.fallbackAllowed, this.rpcClient.conf);
} catch (IOException e) {
failInit(ch, e);
return;
Expand All @@ -206,7 +205,41 @@ public void operationComplete(Future<Boolean> future) throws Exception {
ChannelPipeline p = ch.pipeline();
p.remove(SaslChallengeDecoder.class);
p.remove(NettyHBaseSaslRpcClientHandler.class);
established(ch);

// check if negotiate with server for connection header is necessary
if (saslHandler.isNeedProcessConnectionHeader()) {
Promise<Boolean> connectionHeaderPromise = ch.eventLoop().newPromise();
// create the handler to handle the connection header
ChannelHandler chHandler = new NettyHBaseRpcConnectionHeaderHandler(
connectionHeaderPromise, conf, connectionHeaderWithLength);

// add ReadTimeoutHandler to deal with server doesn't response connection header
// because of the different configuration in client side and server side
p.addFirst(new ReadTimeoutHandler(
RpcClient.DEFAULT_SOCKET_TIMEOUT_READ, TimeUnit.MILLISECONDS));
p.addLast(chHandler);
connectionHeaderPromise.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
if (future.isSuccess()) {
ChannelPipeline p = ch.pipeline();
p.remove(ReadTimeoutHandler.class);
p.remove(NettyHBaseRpcConnectionHeaderHandler.class);
// don't send connection header, NettyHbaseRpcConnectionHeaderHandler
// sent it already
established(ch);
} else {
final Throwable error = future.cause();
scheduleRelogin(error);
failInit(ch, toIOE(error));
}
}
});
} else {
// send the connection header to server
ch.write(connectionHeaderWithLength.retainedDuplicate());
established(ch);
}
} else {
final Throwable error = future.cause();
scheduleRelogin(error);
Expand Down Expand Up @@ -240,6 +273,8 @@ public void operationComplete(ChannelFuture future) throws Exception {
if (useSasl) {
saslNegotiate(ch);
} else {
// send the connection header to server
ch.write(connectionHeaderWithLength.retainedDuplicate());
established(ch);
}
}
Expand Down
Expand Up @@ -72,6 +72,12 @@ abstract class RpcConnection {

protected final HashedWheelTimer timeoutTimer;

protected final Configuration conf;

protected static String CRYPTO_AES_ENABLED_KEY = "hbase.rpc.crypto.encryption.aes.enabled";

protected static boolean CRYPTO_AES_ENABLED_DEFAULT = false;

// the last time we were picked up from connection pool.
protected long lastTouched;

Expand All @@ -84,6 +90,7 @@ protected RpcConnection(Configuration conf, HashedWheelTimer timeoutTimer, Conne
this.timeoutTimer = timeoutTimer;
this.codec = codec;
this.compressor = compressor;
this.conf = conf;

UserGroupInformation ticket = remoteId.getTicket().getUGI();
SecurityInfo securityInfo = SecurityInfo.getInfo(remoteId.getServiceName());
Expand Down Expand Up @@ -224,6 +231,12 @@ protected ConnectionHeader getConnectionHeader() {
builder.setCellBlockCompressorClass(this.compressor.getClass().getCanonicalName());
}
builder.setVersionInfo(ProtobufUtil.getVersionInfo());
boolean isCryptoAESEnable = conf.getBoolean(CRYPTO_AES_ENABLED_KEY, CRYPTO_AES_ENABLED_DEFAULT);
// if Crypto AES enable, setup Cipher transformation
if (isCryptoAESEnable) {
builder.setRpcCryptoCipherTransformation(
conf.get("hbase.rpc.crypto.encryption.aes.cipher.transform", "AES/CTR/NoPadding"));
}
return builder.build();
}

Expand Down
@@ -0,0 +1,38 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hbase.ipc;

import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;

@InterfaceAudience.Public
@InterfaceStability.Evolving
public class UnsupportedCryptoException extends FatalConnectionException {
public UnsupportedCryptoException() {
super();
}

public UnsupportedCryptoException(String msg) {
super(msg);
}

public UnsupportedCryptoException(String msg, Throwable t) {
super(msg, t);
}
}
@@ -0,0 +1,47 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hbase.security;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.crypto.aes.CryptoAES;

/**
* Unwrap messages with Crypto AES. Should be placed after a
* {@link io.netty.handler.codec.LengthFieldBasedFrameDecoder}
*/
@InterfaceAudience.Private
public class CryptoAESUnwrapHandler extends SimpleChannelInboundHandler<ByteBuf> {

private final CryptoAES cryptoAES;

public CryptoAESUnwrapHandler(CryptoAES cryptoAES) {
this.cryptoAES = cryptoAES;
}

@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
byte[] bytes = new byte[msg.readableBytes()];
msg.readBytes(bytes);
ctx.fireChannelRead(Unpooled.wrappedBuffer(cryptoAES.unwrap(bytes, 0, bytes.length)));
}
}

0 comments on commit 0ae211e

Please sign in to comment.