Skip to content

Commit

Permalink
Disable using v2 decoder to decode v3 response if usingV2WireProtocol…
Browse files Browse the repository at this point in the history
… is enabled and handle unknown op in v2 request decoder

Descriptions of the changes in this PR:

*Problem*

- response decoder should use whatever protocol that request is using to decode the response. otherwise it is making wrong assumption on response protocol.
- v2 request decoder doesn't handle unknown op correctly, which will cause netty double-release bytebuf

*Solution*

- make sure response decoder respect the protocol that client is using for sending requests
- v2 request decoder should throw exceptions when fail to decode unknown request

Related issues: #198

Author: Sijie Guo <sijie@apache.org>

Reviewers: Ivan Kelly <ivank@apache.org>, Enrico Olivelli <eolivelli@gmail.com>, Jia Zhai <None>, Matteo Merli <mmerli@apache.org>

This closes #1240 from sijie/bookkeeper_fix_bytebuf_issues
  • Loading branch information
sijie committed Mar 10, 2018
1 parent f0757e9 commit 59e48a3
Show file tree
Hide file tree
Showing 4 changed files with 148 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -188,9 +188,10 @@ public Object decode(ByteBuf packet)
BookkeeperProtocol.AuthMessage.Builder builder = BookkeeperProtocol.AuthMessage.newBuilder();
builder.mergeFrom(new ByteBufInputStream(packet), extensionRegistry);
return new BookieProtocol.AuthRequest(version, builder.build());
}

return packet;
default:
throw new IllegalStateException("Received unknown request op code = " + opCode);
}
}

private static byte[] readMasterKey(ByteBuf packet) {
Expand Down Expand Up @@ -474,12 +475,12 @@ protected void encode(ChannelHandlerContext ctx, Object msg, List<Object> out)
public static class ResponseDecoder extends MessageToMessageDecoder<Object> {
final EnDecoder repPreV3;
final EnDecoder repV3;
boolean usingV3Protocol;
boolean usingV2Protocol;

ResponseDecoder(ExtensionRegistry extensionRegistry) {
ResponseDecoder(ExtensionRegistry extensionRegistry, boolean useV2Protocol) {
repPreV3 = new ResponseEnDeCoderPreV3(extensionRegistry);
repV3 = new ResponseEnDecoderV3(extensionRegistry);
usingV3Protocol = true;
usingV2Protocol = useV2Protocol;
}

@Override
Expand All @@ -493,14 +494,8 @@ protected void decode(ChannelHandlerContext ctx, Object msg, List<Object> out) t
ByteBuf buffer = (ByteBuf) msg;
buffer.markReaderIndex();

if (usingV3Protocol) {
try {
out.add(repV3.decode(buffer));
} catch (InvalidProtocolBufferException e) {
usingV3Protocol = false;
buffer.resetReaderIndex();
out.add(repPreV3.decode(buffer));
}
if (!usingV2Protocol) {
out.add(repV3.decode(buffer));
} else {
// If in the same connection we already got preV3 messages, don't try again to decode V3 messages
out.add(repPreV3.decode(buffer));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,9 @@ protected void initChannel(Channel ch) throws Exception {
new LengthFieldBasedFrameDecoder(maxFrameSize, 0, 4, 0, 4));
pipeline.addLast("lengthprepender", new LengthFieldPrepender(4));
pipeline.addLast("bookieProtoEncoder", new BookieProtoEncoding.RequestEncoder(extRegistry));
pipeline.addLast("bookieProtoDecoder", new BookieProtoEncoding.ResponseDecoder(extRegistry));
pipeline.addLast(
"bookieProtoDecoder",
new BookieProtoEncoding.ResponseDecoder(extRegistry, useV2WireProtocol));
pipeline.addLast("authHandler", new AuthHandler.ClientSideHandler(authProviderFactory, txnIdGenerator,
connectionPeer));
pipeline.addLast("mainhandler", PerChannelBookieClient.this);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.proto;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;

import com.google.common.collect.Lists;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
import java.util.List;
import org.apache.bookkeeper.proto.BookieProtoEncoding.RequestEnDeCoderPreV3;
import org.apache.bookkeeper.proto.BookieProtoEncoding.RequestEnDecoderV3;
import org.apache.bookkeeper.proto.BookieProtoEncoding.ResponseDecoder;
import org.apache.bookkeeper.proto.BookieProtoEncoding.ResponseEnDeCoderPreV3;
import org.apache.bookkeeper.proto.BookieProtoEncoding.ResponseEnDecoderV3;
import org.apache.bookkeeper.proto.BookieProtocol.AddResponse;
import org.apache.bookkeeper.proto.BookkeeperProtocol.AddRequest.Flag;
import org.apache.bookkeeper.proto.BookkeeperProtocol.BKPacketHeader;
import org.apache.bookkeeper.proto.BookkeeperProtocol.OperationType;
import org.apache.bookkeeper.proto.BookkeeperProtocol.ProtocolVersion;
import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode;
import org.junit.Test;

/**
* Unit test {@link BookieProtoEncoding}.
*/
public class BookieProtoEncodingTest {

@Test
public void testV3ResponseDecoderNoFallback() throws Exception {
AddResponse v2Resp = AddResponse.create(
BookieProtocol.CURRENT_PROTOCOL_VERSION,
BookieProtocol.EOK,
1L,
2L);

BookkeeperProtocol.Response v3Resp = BookkeeperProtocol.Response.newBuilder()
.setHeader(BKPacketHeader.newBuilder()
.setVersion(ProtocolVersion.VERSION_THREE)
.setTxnId(1L)
.setOperation(OperationType.ADD_ENTRY)
.build())
.setStatus(StatusCode.EOK)
.setAddResponse(BookkeeperProtocol.AddResponse.newBuilder()
.setStatus(StatusCode.EOK)
.setLedgerId(1L)
.setEntryId(2L)
.build())
.build();

List<Object> outList = Lists.newArrayList();

ResponseEnDeCoderPreV3 v2Encoder = new ResponseEnDeCoderPreV3(null);
ResponseEnDecoderV3 v3Encoder = new ResponseEnDecoderV3(null);

ResponseDecoder v3Decoder = new ResponseDecoder(null, false);
try {
v3Decoder.decode(
mock(ChannelHandlerContext.class),
v2Encoder.encode(v2Resp, UnpooledByteBufAllocator.DEFAULT),
outList
);
fail("V3 response decoder should fail on decoding v2 response");
} catch (InvalidProtocolBufferException e) {
// expected
}
assertEquals(0, outList.size());

v3Decoder.decode(
mock(ChannelHandlerContext.class),
v3Encoder.encode(v3Resp, UnpooledByteBufAllocator.DEFAULT),
outList);
assertEquals(1, outList.size());
}

@Test(expected = IllegalStateException.class)
public void testV2RequestDecoderThrowExceptionOnUnknownRequests() throws Exception {
RequestEnDeCoderPreV3 v2ReqEncoder = new RequestEnDeCoderPreV3(null);
RequestEnDecoderV3 v3ReqEncoder = new RequestEnDecoderV3(null);

BookkeeperProtocol.Request v3Req = BookkeeperProtocol.Request.newBuilder()
.setHeader(BKPacketHeader.newBuilder()
.setVersion(ProtocolVersion.VERSION_THREE)
.setTxnId(1L)
.setOperation(OperationType.ADD_ENTRY)
.build())
.setAddRequest(BookkeeperProtocol.AddRequest.newBuilder()
.setLedgerId(1L)
.setEntryId(2L)
.setMasterKey(ByteString.copyFrom("", UTF_8))
.setFlag(Flag.RECOVERY_ADD)
.setBody(ByteString.copyFrom("test", UTF_8)))
.build();


v2ReqEncoder.decode((ByteBuf) v3ReqEncoder.encode(v3Req, UnpooledByteBufAllocator.DEFAULT));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,15 @@
import org.apache.bookkeeper.proto.BookieProtocol.Request;
import org.apache.bookkeeper.proto.BookieProtocol.Response;
import org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.apache.bookkeeper.util.OrderedSafeExecutor;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Test backward compatibility.
*/
public class TestBackwardCompatCMS42 extends BookKeeperClusterTestCase {
static final Logger LOG = LoggerFactory.getLogger(TestBackwardCompatCMS42.class);

private static final byte[] SUCCESS_RESPONSE = {1};
private static final byte[] FAILURE_RESPONSE = {2};
Expand Down Expand Up @@ -181,7 +179,9 @@ BookieServer startAndStoreBookie(ServerConfiguration conf) throws Exception {
}

CompatClient42 newCompatClient(BookieSocketAddress addr) throws Exception {
return new CompatClient42(executor, eventLoopGroup, addr, authProvider, extRegistry);
ClientConfiguration conf = new ClientConfiguration();
conf.setUseV2WireProtocol(true);
return new CompatClient42(conf, executor, eventLoopGroup, addr, authProvider, extRegistry);
}

// extending PerChannelBookieClient to get the pipeline factory
Expand All @@ -190,11 +190,21 @@ class CompatClient42 extends PerChannelBookieClient {
Channel channel;
final CountDownLatch connected = new CountDownLatch(1);

CompatClient42(OrderedSafeExecutor executor, EventLoopGroup eventLoopGroup,
CompatClient42(ClientConfiguration conf,
OrderedSafeExecutor executor,
EventLoopGroup eventLoopGroup,
BookieSocketAddress addr,
ClientAuthProvider.Factory authProviderFactory,
ExtensionRegistry extRegistry) throws Exception {
super(executor, eventLoopGroup, addr, authProviderFactory, extRegistry);
super(
conf,
executor,
eventLoopGroup,
addr,
NullStatsLogger.INSTANCE,
authProviderFactory,
extRegistry,
null);

state = ConnectionState.CONNECTING;
ChannelFuture future = connect();
Expand Down

0 comments on commit 59e48a3

Please sign in to comment.