Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed Auth with v2 protocol #1805

Merged
merged 3 commits into from
Nov 11, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -226,16 +226,19 @@ static class ClientSideHandler extends ChannelDuplexHandler {
final Queue<Object> waitingForAuth = new ConcurrentLinkedQueue<>();
final ClientConnectionPeer connectionPeer;

private final boolean isUsingV2Protocol;

public ClientAuthProvider getAuthProvider() {
return authProvider;
}

ClientSideHandler(ClientAuthProvider.Factory authProviderFactory, AtomicLong transactionIdGenerator,
ClientConnectionPeer connectionPeer) {
ClientConnectionPeer connectionPeer, boolean isUsingV2Protocol) {
this.authProviderFactory = authProviderFactory;
this.transactionIdGenerator = transactionIdGenerator;
this.connectionPeer = connectionPeer;
authProvider = null;
this.isUsingV2Protocol = isUsingV2Protocol;
}

@Override
Expand Down Expand Up @@ -279,7 +282,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
if (AUTHENTICATION_DISABLED_PLUGIN_NAME.equals(am.getAuthPluginName())){
SocketAddress remote = ctx.channel().remoteAddress();
LOG.info("Authentication is not enabled."
+ "Considering this client {0} authenticated", remote);
+ "Considering this client {} authenticated", remote);
AuthHandshakeCompleteCallback cb = new AuthHandshakeCompleteCallback(ctx);
cb.operationComplete(BKException.Code.OK, null);
return;
Expand All @@ -296,6 +299,33 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
break;
}
}
} else if (msg instanceof BookieProtocol.Response) {
BookieProtocol.Response resp = (BookieProtocol.Response) msg;
switch (resp.opCode) {
case BookieProtocol.AUTH:
if (resp.errorCode != BookieProtocol.EOK) {
authenticationError(ctx, resp.errorCode);
} else {
BookkeeperProtocol.AuthMessage am = ((BookieProtocol.AuthResponse) resp).authMessage;
if (AUTHENTICATION_DISABLED_PLUGIN_NAME.equals(am.getAuthPluginName())) {
SocketAddress remote = ctx.channel().remoteAddress();
LOG.info("Authentication is not enabled."
+ "Considering this client {} authenticated", remote);
AuthHandshakeCompleteCallback cb = new AuthHandshakeCompleteCallback(ctx);
cb.operationComplete(BKException.Code.OK, null);
return;
}
byte[] payload = am.getPayload().toByteArray();
authProvider.process(AuthToken.wrap(payload), new AuthRequestCallback(ctx,
authProviderFactory.getPluginName()));
}
break;
default:
LOG.warn("dropping received message {} from bookie {}", msg, ctx.channel());
// else just drop the message, we're not authenticated so nothing should be coming
// through
break;
}
}
}

Expand All @@ -319,7 +349,7 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
} else if (msg instanceof BookieProtocol.Request) {
// let auth messages through, queue the rest
BookieProtocol.Request req = (BookieProtocol.Request) msg;
if (BookkeeperProtocol.OperationType.AUTH.getNumber() == req.getOpCode()) {
if (BookieProtocol.AUTH == req.getOpCode()) {
super.write(ctx, msg, promise);
super.flush(ctx);
} else {
Expand Down Expand Up @@ -356,16 +386,24 @@ public void operationComplete(int rc, AuthToken newam) {
authenticationError(ctx, rc);
return;
}

AuthMessage message = AuthMessage.newBuilder().setAuthPluginName(pluginName)
.setPayload(ByteString.copyFrom(newam.getData())).build();

BookkeeperProtocol.BKPacketHeader header = BookkeeperProtocol.BKPacketHeader.newBuilder()
.setVersion(BookkeeperProtocol.ProtocolVersion.VERSION_THREE)
.setOperation(BookkeeperProtocol.OperationType.AUTH).setTxnId(newTxnId()).build();
BookkeeperProtocol.Request.Builder builder = BookkeeperProtocol.Request.newBuilder().setHeader(header)
.setAuthRequest(message);

channel.writeAndFlush(builder.build());
if (isUsingV2Protocol) {
channel.writeAndFlush(
new BookieProtocol.AuthRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION, message),
channel.voidPromise());
} else {
// V3 protocol
BookkeeperProtocol.BKPacketHeader header = BookkeeperProtocol.BKPacketHeader.newBuilder()
.setVersion(BookkeeperProtocol.ProtocolVersion.VERSION_THREE)
.setOperation(BookkeeperProtocol.OperationType.AUTH).setTxnId(newTxnId()).build();
BookkeeperProtocol.Request.Builder builder = BookkeeperProtocol.Request.newBuilder()
.setHeader(header)
.setAuthRequest(message);
channel.writeAndFlush(builder.build());
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,17 @@ public void processRequest(Object msg, Channel c) {
checkArgument(r instanceof BookieProtocol.ReadRequest);
processReadRequest((BookieProtocol.ReadRequest) r, c);
break;
case BookieProtocol.AUTH:
LOG.info("Ignoring auth operation from client {}", c.remoteAddress());
BookkeeperProtocol.AuthMessage message = BookkeeperProtocol.AuthMessage
.newBuilder()
.setAuthPluginName(AuthProviderFactoryFactory.AUTHENTICATION_DISABLED_PLUGIN_NAME)
.setPayload(ByteString.copyFrom(AuthToken.NULL.getData()))
.build();

c.writeAndFlush(new BookieProtocol.AuthResponse(
BookieProtocol.CURRENT_PROTOCOL_VERSION, message));
break;
default:
LOG.error("Unknown op type {}, sending error", r.getOpCode());
c.writeAndFlush(ResponseBuilder.buildErrorResponse(BookieProtocol.EBADREQ, r));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ protected void initChannel(Channel ch) throws Exception {
"bookieProtoDecoder",
new BookieProtoEncoding.ResponseDecoder(extRegistry, useV2WireProtocol));
pipeline.addLast("authHandler", new AuthHandler.ClientSideHandler(authProviderFactory, txnIdGenerator,
connectionPeer));
connectionPeer, useV2WireProtocol));
pipeline.addLast("mainhandler", PerChannelBookieClient.this);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Enumeration;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
Expand All @@ -45,12 +46,16 @@
import org.apache.bookkeeper.proto.ClientConnectionPeer;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Test authentication.
*/
@RunWith(Parameterized.class)
public class TestAuth extends BookKeeperClusterTestCase {
static final Logger LOG = LoggerFactory.getLogger(TestAuth.class);
public static final String TEST_AUTH_PROVIDER_PLUGIN_NAME = "TestAuthProviderPlugin";
Expand All @@ -61,8 +66,29 @@ public class TestAuth extends BookKeeperClusterTestCase {
private static final byte[] FAILURE_RESPONSE = {2};
private static final byte[] PAYLOAD_MESSAGE = {3};

public TestAuth() {
enum ProtocolVersion {
ProtocolV2, ProtocolV3
}

@Parameters
public static Collection<Object[]> configs() {
return Arrays.asList(new Object[][] {
{ ProtocolVersion.ProtocolV2 },
{ ProtocolVersion.ProtocolV3 },
});
}

private final ProtocolVersion protocolVersion;

public TestAuth(ProtocolVersion protocolVersion) {
super(0); // start them later when auth providers are configured
this.protocolVersion = protocolVersion;
}

protected ClientConfiguration newClientConfiguration() {
ClientConfiguration conf = super.newClientConfiguration();
conf.setUseV2WireProtocol(protocolVersion == ProtocolVersion.ProtocolV2);
return conf;
}

// we pass in ledgerId because the method may throw exceptions
Expand Down Expand Up @@ -136,6 +162,13 @@ public void testSingleMessageAuth() throws Exception {

@Test
public void testCloseMethodCalledOnAuthProvider() throws Exception {
LogCloseCallsBookieAuthProviderFactory.closeCountersOnFactory.set(0);
LogCloseCallsBookieAuthProviderFactory.closeCountersOnConnections.set(0);
LogCloseCallsBookieAuthProviderFactory.initCountersOnFactory.set(0);
LogCloseCallsBookieAuthProviderFactory.initCountersOnConnections.set(0);
LogCloseCallsClientAuthProviderFactory.initCountersOnFactory.set(0);
LogCloseCallsClientAuthProviderFactory.closeCountersOnFactory.set(0);

ServerConfiguration bookieConf = newServerConfiguration();
bookieConf.setBookieAuthProviderFactoryClass(
LogCloseCallsBookieAuthProviderFactory.class.getName());
Expand Down Expand Up @@ -272,6 +305,11 @@ public void testDifferentPluginFailure() throws Exception {
} catch (BKException.BKUnauthorizedAccessException bke) {
// bookie should have sent a negative response before
// breaking the conneciton
assertEquals(ProtocolVersion.ProtocolV3, protocolVersion);
} catch (BKException.BKNotEnoughBookiesException nebe) {
// With V2 we don't get the authorization error, but rather just
// fail to write to bookies.
assertEquals(ProtocolVersion.ProtocolV2, protocolVersion);
}
assertFalse(ledgerId.get() == -1);
assertEquals("Shouldn't have entry", 0, entryCount(ledgerId.get(), bookieConf, clientConf));
Expand Down