Skip to content

Commit

Permalink
Fixed Auth with v2 protocol
Browse files Browse the repository at this point in the history
### Motivation

BK auth framework is currently broken when using v2 protocol.

### Changes

 * Fixed auth when using V2 protocol
 * Made sure a client with authentication enabled can talk to a bookie without authentication. This is required in any case when enabling/disabling authentication on a live cluster.
 * Run all auth tests against both v2 and v3 protocol.

This should be included in 4.7.2 to give a path to upgrade.

cc/ rdhabalia

Reviewers: Enrico Olivelli <eolivelli@gmail.com>, Sijie Guo <sijie@apache.org>

This closes #1805 from merlimat/fix-v2-auth

(cherry picked from commit dc2aaaa)
Signed-off-by: Sijie Guo <sijie@apache.org>
  • Loading branch information
merlimat authored and sijie committed Nov 11, 2018
1 parent 411256a commit 8b1c2a3
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -226,16 +226,19 @@ static class ClientSideHandler extends ChannelDuplexHandler {
final Queue<Object> waitingForAuth = new ConcurrentLinkedQueue<>();
final ClientConnectionPeer connectionPeer;

private final boolean isUsingV2Protocol;

public ClientAuthProvider getAuthProvider() {
return authProvider;
}

ClientSideHandler(ClientAuthProvider.Factory authProviderFactory, AtomicLong transactionIdGenerator,
ClientConnectionPeer connectionPeer) {
ClientConnectionPeer connectionPeer, boolean isUsingV2Protocol) {
this.authProviderFactory = authProviderFactory;
this.transactionIdGenerator = transactionIdGenerator;
this.connectionPeer = connectionPeer;
authProvider = null;
this.isUsingV2Protocol = isUsingV2Protocol;
}

@Override
Expand Down Expand Up @@ -279,7 +282,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
if (AUTHENTICATION_DISABLED_PLUGIN_NAME.equals(am.getAuthPluginName())){
SocketAddress remote = ctx.channel().remoteAddress();
LOG.info("Authentication is not enabled."
+ "Considering this client {0} authenticated", remote);
+ "Considering this client {} authenticated", remote);
AuthHandshakeCompleteCallback cb = new AuthHandshakeCompleteCallback(ctx);
cb.operationComplete(BKException.Code.OK, null);
return;
Expand All @@ -296,6 +299,33 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
break;
}
}
} else if (msg instanceof BookieProtocol.Response) {
BookieProtocol.Response resp = (BookieProtocol.Response) msg;
switch (resp.opCode) {
case BookieProtocol.AUTH:
if (resp.errorCode != BookieProtocol.EOK) {
authenticationError(ctx, resp.errorCode);
} else {
BookkeeperProtocol.AuthMessage am = ((BookieProtocol.AuthResponse) resp).authMessage;
if (AUTHENTICATION_DISABLED_PLUGIN_NAME.equals(am.getAuthPluginName())) {
SocketAddress remote = ctx.channel().remoteAddress();
LOG.info("Authentication is not enabled."
+ "Considering this client {} authenticated", remote);
AuthHandshakeCompleteCallback cb = new AuthHandshakeCompleteCallback(ctx);
cb.operationComplete(BKException.Code.OK, null);
return;
}
byte[] payload = am.getPayload().toByteArray();
authProvider.process(AuthToken.wrap(payload), new AuthRequestCallback(ctx,
authProviderFactory.getPluginName()));
}
break;
default:
LOG.warn("dropping received message {} from bookie {}", msg, ctx.channel());
// else just drop the message, we're not authenticated so nothing should be coming
// through
break;
}
}
}

Expand All @@ -319,7 +349,7 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
} else if (msg instanceof BookieProtocol.Request) {
// let auth messages through, queue the rest
BookieProtocol.Request req = (BookieProtocol.Request) msg;
if (BookkeeperProtocol.OperationType.AUTH.getNumber() == req.getOpCode()) {
if (BookieProtocol.AUTH == req.getOpCode()) {
super.write(ctx, msg, promise);
super.flush(ctx);
} else {
Expand Down Expand Up @@ -356,16 +386,24 @@ public void operationComplete(int rc, AuthToken newam) {
authenticationError(ctx, rc);
return;
}

AuthMessage message = AuthMessage.newBuilder().setAuthPluginName(pluginName)
.setPayload(ByteString.copyFrom(newam.getData())).build();

BookkeeperProtocol.BKPacketHeader header = BookkeeperProtocol.BKPacketHeader.newBuilder()
.setVersion(BookkeeperProtocol.ProtocolVersion.VERSION_THREE)
.setOperation(BookkeeperProtocol.OperationType.AUTH).setTxnId(newTxnId()).build();
BookkeeperProtocol.Request.Builder builder = BookkeeperProtocol.Request.newBuilder().setHeader(header)
.setAuthRequest(message);

channel.writeAndFlush(builder.build());
if (isUsingV2Protocol) {
channel.writeAndFlush(
new BookieProtocol.AuthRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION, message),
channel.voidPromise());
} else {
// V3 protocol
BookkeeperProtocol.BKPacketHeader header = BookkeeperProtocol.BKPacketHeader.newBuilder()
.setVersion(BookkeeperProtocol.ProtocolVersion.VERSION_THREE)
.setOperation(BookkeeperProtocol.OperationType.AUTH).setTxnId(newTxnId()).build();
BookkeeperProtocol.Request.Builder builder = BookkeeperProtocol.Request.newBuilder()
.setHeader(header)
.setAuthRequest(message);
channel.writeAndFlush(builder.build());
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,17 @@ public void processRequest(Object msg, Channel c) {
checkArgument(r instanceof BookieProtocol.ReadRequest);
processReadRequest((BookieProtocol.ReadRequest) r, c);
break;
case BookieProtocol.AUTH:
LOG.info("Ignoring auth operation from client {}", c.remoteAddress());
BookkeeperProtocol.AuthMessage message = BookkeeperProtocol.AuthMessage
.newBuilder()
.setAuthPluginName(AuthProviderFactoryFactory.AUTHENTICATION_DISABLED_PLUGIN_NAME)
.setPayload(ByteString.copyFrom(AuthToken.NULL.getData()))
.build();

c.writeAndFlush(new BookieProtocol.AuthResponse(
BookieProtocol.CURRENT_PROTOCOL_VERSION, message));
break;
default:
LOG.error("Unknown op type {}, sending error", r.getOpCode());
c.writeAndFlush(ResponseBuilder.buildErrorResponse(BookieProtocol.EBADREQ, r));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ protected void initChannel(Channel ch) throws Exception {
"bookieProtoDecoder",
new BookieProtoEncoding.ResponseDecoder(extRegistry, useV2WireProtocol));
pipeline.addLast("authHandler", new AuthHandler.ClientSideHandler(authProviderFactory, txnIdGenerator,
connectionPeer));
connectionPeer, useV2WireProtocol));
pipeline.addLast("mainhandler", PerChannelBookieClient.this);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Enumeration;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
Expand All @@ -45,12 +46,16 @@
import org.apache.bookkeeper.proto.ClientConnectionPeer;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Test authentication.
*/
@RunWith(Parameterized.class)
public class TestAuth extends BookKeeperClusterTestCase {
static final Logger LOG = LoggerFactory.getLogger(TestAuth.class);
public static final String TEST_AUTH_PROVIDER_PLUGIN_NAME = "TestAuthProviderPlugin";
Expand All @@ -61,8 +66,29 @@ public class TestAuth extends BookKeeperClusterTestCase {
private static final byte[] FAILURE_RESPONSE = {2};
private static final byte[] PAYLOAD_MESSAGE = {3};

public TestAuth() {
enum ProtocolVersion {
ProtocolV2, ProtocolV3
}

@Parameters
public static Collection<Object[]> configs() {
return Arrays.asList(new Object[][] {
{ ProtocolVersion.ProtocolV2 },
{ ProtocolVersion.ProtocolV3 },
});
}

private final ProtocolVersion protocolVersion;

public TestAuth(ProtocolVersion protocolVersion) {
super(0); // start them later when auth providers are configured
this.protocolVersion = protocolVersion;
}

protected ClientConfiguration newClientConfiguration() {
ClientConfiguration conf = super.newClientConfiguration();
conf.setUseV2WireProtocol(protocolVersion == ProtocolVersion.ProtocolV2);
return conf;
}

// we pass in ledgerId because the method may throw exceptions
Expand Down Expand Up @@ -135,6 +161,13 @@ public void testSingleMessageAuth() throws Exception {

@Test
public void testCloseMethodCalledOnAuthProvider() throws Exception {
LogCloseCallsBookieAuthProviderFactory.closeCountersOnFactory.set(0);
LogCloseCallsBookieAuthProviderFactory.closeCountersOnConnections.set(0);
LogCloseCallsBookieAuthProviderFactory.initCountersOnFactory.set(0);
LogCloseCallsBookieAuthProviderFactory.initCountersOnConnections.set(0);
LogCloseCallsClientAuthProviderFactory.initCountersOnFactory.set(0);
LogCloseCallsClientAuthProviderFactory.closeCountersOnFactory.set(0);

ServerConfiguration bookieConf = newServerConfiguration();
bookieConf.setBookieAuthProviderFactoryClass(
LogCloseCallsBookieAuthProviderFactory.class.getName());
Expand Down Expand Up @@ -271,6 +304,11 @@ public void testDifferentPluginFailure() throws Exception {
} catch (BKException.BKUnauthorizedAccessException bke) {
// bookie should have sent a negative response before
// breaking the conneciton
assertEquals(ProtocolVersion.ProtocolV3, protocolVersion);
} catch (BKException.BKNotEnoughBookiesException nebe) {
// With V2 we don't get the authorization error, but rather just
// fail to write to bookies.
assertEquals(ProtocolVersion.ProtocolV2, protocolVersion);
}
assertFalse(ledgerId.get() == -1);
assertEquals("Shouldn't have entry", 0, entryCount(ledgerId.get(), bookieConf, clientConf));
Expand Down

0 comments on commit 8b1c2a3

Please sign in to comment.