From 8d067c8d9adbf07cbfd1304e2306e1f1f408f647 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andor=20Moln=C3=A1r?= Date: Mon, 30 Oct 2017 15:28:28 +0100 Subject: [PATCH] ZOOKEEPER-2101: This patch has been created to reanimate an ancient, unclosed Jira: https://issues.apache.org/jira/browse/ZOOKEEPER-2101 Original patch was done by Liu Shaohui and applied to latest trunk without any modification. This one would be a nice kick off for implementing jura buffer size monitoring. --- .../org/apache/jute/BinaryOutputArchive.java | 5 ++++ .../server/PrepRequestProcessor.java | 14 ++++++++++ .../apache/zookeeper/server/ZKDatabase.java | 19 +++++-------- .../zookeeper/server/quorum/Leader.java | 16 +++-------- .../zookeeper/server/util/SerializeUtils.java | 20 ++++++++++++++ .../apache/zookeeper/test/BufferSizeTest.java | 27 ++++++++++++++----- 6 files changed, 68 insertions(+), 33 deletions(-) diff --git a/src/java/main/org/apache/jute/BinaryOutputArchive.java b/src/java/main/org/apache/jute/BinaryOutputArchive.java index a8f313c9fc6..f4f8549c905 100644 --- a/src/java/main/org/apache/jute/BinaryOutputArchive.java +++ b/src/java/main/org/apache/jute/BinaryOutputArchive.java @@ -115,6 +115,11 @@ public void writeBuffer(byte barr[], String tag) out.writeInt(-1); return; } + if (barr.length >= BinaryInputArchive.maxBuffer) { + throw new IOException("Len error " + barr.length + + ", larger than max buffer: " + + BinaryInputArchive.maxBuffer + " set by jute.maxbuffer"); + } out.writeInt(barr.length); out.write(barr); } diff --git a/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java b/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java index 537ef71a135..8df68fae7fb 100644 --- a/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java +++ b/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java @@ -18,6 +18,7 @@ package org.apache.zookeeper.server; +import org.apache.jute.BinaryInputArchive; import org.apache.jute.BinaryOutputArchive; import org.apache.jute.Record; import org.apache.zookeeper.CreateMode; @@ -51,6 +52,7 @@ import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException; import org.apache.zookeeper.server.quorum.flexible.QuorumMaj; import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier; +import org.apache.zookeeper.server.util.SerializeUtils; import org.apache.zookeeper.txn.CheckVersionTxn; import org.apache.zookeeper.txn.CreateContainerTxn; import org.apache.zookeeper.txn.CreateSessionTxn; @@ -905,10 +907,22 @@ protected void pRequest(Request request) throws RequestProcessorException { request.setTxn(new ErrorTxn(Code.MARSHALLINGERROR.intValue())); } } + checkProposalSize(request); request.zxid = zks.getZxid(); nextProcessor.processRequest(request); } + private void checkProposalSize(Request request) { + if (request.getHdr() == null) return; + byte[] data = SerializeUtils.serializeRequest(request); + if (data.length > BinaryInputArchive.maxBuffer) { + LOG.error("Len error {}, larger than max buffer: {} set by jute.maxbuffer", + data.length, BinaryInputArchive.maxBuffer); + request.getHdr().setType(OpCode.error); + request.setTxn(new ErrorTxn(Code.BADARGUMENTS.intValue())); + } + } + private List removeDuplicates(List acl) { List retval = new LinkedList(); diff --git a/src/java/main/org/apache/zookeeper/server/ZKDatabase.java b/src/java/main/org/apache/zookeeper/server/ZKDatabase.java index 16baf46f05f..c302d975a90 100644 --- a/src/java/main/org/apache/zookeeper/server/ZKDatabase.java +++ b/src/java/main/org/apache/zookeeper/server/ZKDatabase.java @@ -18,7 +18,6 @@ package org.apache.zookeeper.server; -import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.PrintWriter; import java.util.Collection; @@ -32,7 +31,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; -import org.apache.jute.BinaryOutputArchive; import org.apache.jute.InputArchive; import org.apache.jute.OutputArchive; import org.apache.jute.Record; @@ -264,19 +262,14 @@ public void addCommittedProposal(Request request) { maxCommittedLog = request.zxid; } - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos); - try { - request.getHdr().serialize(boa, "hdr"); - if (request.getTxn() != null) { - request.getTxn().serialize(boa, "txn"); - } - baos.close(); - } catch (IOException e) { - LOG.error("This really should be impossible", e); + byte[] data = SerializeUtils.serializeRequest(request); + if (request.request != null) { + LOG.debug("Request type: {}, size: {}, zxid: {}," + + " Proposal size: {}", request.type, + request.request.capacity(), request.zxid, data.length); } QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid, - baos.toByteArray(), null); + data, null); Proposal p = new Proposal(); p.packet = pp; p.request = request; diff --git a/src/java/main/org/apache/zookeeper/server/quorum/Leader.java b/src/java/main/org/apache/zookeeper/server/quorum/Leader.java index cf2eb378e2e..41bea12f30f 100644 --- a/src/java/main/org/apache/zookeeper/server/quorum/Leader.java +++ b/src/java/main/org/apache/zookeeper/server/quorum/Leader.java @@ -48,6 +48,7 @@ import org.apache.zookeeper.server.ZooKeeperCriticalThread; import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType; import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier; +import org.apache.zookeeper.server.util.SerializeUtils; import org.apache.zookeeper.server.util.ZxidUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1047,20 +1048,9 @@ public Proposal propose(Request request) throws XidRolloverException { throw new XidRolloverException(msg); } - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos); - try { - request.getHdr().serialize(boa, "hdr"); - if (request.getTxn() != null) { - request.getTxn().serialize(boa, "txn"); - } - baos.close(); - } catch (IOException e) { - LOG.warn("This really should be impossible", e); - } + byte[] data = SerializeUtils.serializeRequest(request); QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid, - baos.toByteArray(), null); - + data, null); Proposal p = new Proposal(); p.packet = pp; p.request = request; diff --git a/src/java/main/org/apache/zookeeper/server/util/SerializeUtils.java b/src/java/main/org/apache/zookeeper/server/util/SerializeUtils.java index b7936f1a4d5..eccf5270bd0 100644 --- a/src/java/main/org/apache/zookeeper/server/util/SerializeUtils.java +++ b/src/java/main/org/apache/zookeeper/server/util/SerializeUtils.java @@ -19,11 +19,14 @@ package org.apache.zookeeper.server.util; import org.apache.jute.BinaryInputArchive; +import org.apache.jute.BinaryOutputArchive; import org.apache.jute.InputArchive; import org.apache.jute.OutputArchive; import org.apache.jute.Record; import org.apache.zookeeper.ZooDefs.OpCode; +import org.apache.zookeeper.common.IOUtils; import org.apache.zookeeper.server.DataTree; +import org.apache.zookeeper.server.Request; import org.apache.zookeeper.server.ZooTrace; import org.apache.zookeeper.txn.CreateContainerTxn; import org.apache.zookeeper.txn.CreateSessionTxn; @@ -40,6 +43,7 @@ import org.slf4j.LoggerFactory; import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.EOFException; import java.io.IOException; import java.util.HashMap; @@ -148,4 +152,20 @@ public static void serializeSnapshot(DataTree dt,OutputArchive oa, dt.serialize(oa, "tree"); } + public static byte[] serializeRequest(Request request) { + if (request == null || request.getHdr() == null) return null; + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos); + try { + request.getHdr().serialize(boa, "hdr"); + if (request.getTxn() != null) { + request.getTxn().serialize(boa, "txn"); + } + } catch (IOException e) { + LOG.error("This really should be impossible", e); + } finally { + IOUtils.cleanup(LOG, baos); + } + return baos.toByteArray(); + } } diff --git a/src/java/test/org/apache/zookeeper/test/BufferSizeTest.java b/src/java/test/org/apache/zookeeper/test/BufferSizeTest.java index 6d74e54b5d4..9aba5b73fb1 100644 --- a/src/java/test/org/apache/zookeeper/test/BufferSizeTest.java +++ b/src/java/test/org/apache/zookeeper/test/BufferSizeTest.java @@ -30,20 +30,26 @@ import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Test; public class BufferSizeTest extends ClientBase { - public static final int TEST_MAXBUFFER = 100; + public static final int TEST_MAXBUFFER = 1000; private static final File TEST_DATA = new File( System.getProperty("test.data.dir", "build/test/data"), "buffersize"); private ZooKeeper zk; - @Before - public void setMaxBuffer() throws IOException, InterruptedException { + @BeforeClass + public static void setMaxBuffer() throws IOException, InterruptedException { System.setProperty("jute.maxbuffer", "" + TEST_MAXBUFFER); - assertEquals("Can't set jute.maxbuffer!", TEST_MAXBUFFER, BinaryInputArchive.maxBuffer); + } + + @Before + public void checkMaxBuffer() throws IOException, InterruptedException { + assertEquals("Can't set jute.maxbuffer!", TEST_MAXBUFFER, + BinaryInputArchive.maxBuffer); zk = createClient(); } @@ -71,12 +77,19 @@ public void execute(byte[] data) throws Exception { /** Issues requests containing data smaller, equal, and greater than TEST_MAXBUFFER. */ private void testRequests(ClientOp clientOp) throws Exception { - clientOp.execute(new byte[TEST_MAXBUFFER - 60]); + clientOp.execute(new byte[TEST_MAXBUFFER - 200]); + try { + // This should fail since the buffer size > the data size due to extra fields + clientOp.execute(new byte[TEST_MAXBUFFER - 10]); + fail("Request exceeding jute.maxbuffer succeeded!"); + } catch (KeeperException.ConnectionLossException e) {} + try { // This should fail since the buffer size > the data size due to extra fields clientOp.execute(new byte[TEST_MAXBUFFER]); fail("Request exceeding jute.maxbuffer succeeded!"); } catch (KeeperException.ConnectionLossException e) {} + try { clientOp.execute(new byte[TEST_MAXBUFFER + 10]); fail("Request exceeding jute.maxbuffer succeeded!"); @@ -90,8 +103,8 @@ private interface ClientOp { @Test public void testStartup() throws Exception { final String path = "/test_node"; - zk.create(path, new byte[TEST_MAXBUFFER - 60], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - zk.setData(path, new byte[TEST_MAXBUFFER - 50], -1); + zk.create(path, new byte[TEST_MAXBUFFER - 200], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + zk.setData(path, new byte[TEST_MAXBUFFER - 150], -1); stopServer(); startServer();