From 5cbca58114104901a10e8db490897bc32f8f3bdf Mon Sep 17 00:00:00 2001 From: Brian Nixon Date: Wed, 8 Mar 2017 18:48:19 -0800 Subject: [PATCH 1/3] ZOOKEEPER-2725: Promote local session to global when ephemeral created in multi-op --- .../server/quorum/QuorumZooKeeperServer.java | 40 +++++++-- .../server/MultiOpSessionUpgradeTest.java | 87 +++++++++++++++++++ 2 files changed, 119 insertions(+), 8 deletions(-) create mode 100644 src/java/test/org/apache/zookeeper/server/MultiOpSessionUpgradeTest.java diff --git a/src/java/main/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java b/src/java/main/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java index df7b4070748..4ff3b9ab0ee 100644 --- a/src/java/main/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java +++ b/src/java/main/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java @@ -23,6 +23,8 @@ import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.MultiTransactionRecord; +import org.apache.zookeeper.Op; import org.apache.zookeeper.ZooDefs.OpCode; import org.apache.zookeeper.proto.CreateRequest; import org.apache.zookeeper.server.ByteBufferInputStream; @@ -62,18 +64,40 @@ public Request checkUpgradeSession(Request request) // This is called by the request processor thread (either follower // or observer request processor), which is unique to a learner. // So will not be called concurrently by two threads. - if (request.type != OpCode.create || + if ((request.type != OpCode.create && request.type != OpCode.create2 && request.type != OpCode.multi) || !upgradeableSessionTracker.isLocalSession(request.sessionId)) { return null; } - CreateRequest createRequest = new CreateRequest(); - request.request.rewind(); - ByteBufferInputStream.byteBuffer2Record(request.request, createRequest); - request.request.rewind(); - CreateMode createMode = CreateMode.fromFlag(createRequest.getFlags()); - if (!createMode.isEphemeral()) { - return null; + + if (OpCode.multi == request.type) { + MultiTransactionRecord multiTransactionRecord = new MultiTransactionRecord(); + request.request.rewind(); + ByteBufferInputStream.byteBuffer2Record(request.request, multiTransactionRecord); + request.request.rewind(); + boolean containsEphemeralCreate = false; + for (Op op : multiTransactionRecord) { + if (op.getType() == OpCode.create || op.getType() == OpCode.create2) { + CreateRequest createRequest = (CreateRequest)op.toRequestRecord(); + CreateMode createMode = CreateMode.fromFlag(createRequest.getFlags()); + if (createMode.isEphemeral()) { + containsEphemeralCreate = true; + } + } + } + if (!containsEphemeralCreate) { + return null; + } + } else { + CreateRequest createRequest = new CreateRequest(); + request.request.rewind(); + ByteBufferInputStream.byteBuffer2Record(request.request, createRequest); + request.request.rewind(); + CreateMode createMode = CreateMode.fromFlag(createRequest.getFlags()); + if (!createMode.isEphemeral()) { + return null; + } } + // Uh oh. We need to upgrade before we can proceed. if (!self.isLocalSessionsUpgradingEnabled()) { throw new KeeperException.EphemeralOnLocalSessionException(); diff --git a/src/java/test/org/apache/zookeeper/server/MultiOpSessionUpgradeTest.java b/src/java/test/org/apache/zookeeper/server/MultiOpSessionUpgradeTest.java new file mode 100644 index 00000000000..b33ee4bc6ab --- /dev/null +++ b/src/java/test/org/apache/zookeeper/server/MultiOpSessionUpgradeTest.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server; + +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.Op; +import org.apache.zookeeper.OpResult; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.test.ClientBase; +import org.apache.zookeeper.test.QuorumBase; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; + +public class MultiOpSessionUpgradeTest extends QuorumBase { + protected static final Logger LOG = LoggerFactory.getLogger(MultiOpSessionUpgradeTest.class); + + @Override + public void setUp() throws Exception { + localSessionsEnabled = true; + localSessionsUpgradingEnabled = true; + super.setUp(); + } + + @Test + public void ephemeralCreateMultiOpTest() throws KeeperException, InterruptedException, IOException { + final ZooKeeper zk = createClient(); + waitForClient(zk, ZooKeeper.States.CONNECTED); + + String data = "test"; + String path = "/ephemeralcreatemultiop"; + zk.create(path, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + + List multi = null; + LOG.info("RUNNING MULTI-OP"); + try { + multi = zk.multi(Arrays.asList( + Op.setData(path, data.getBytes(), 0), + Op.create(path + "/e", data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL), + Op.create(path + "/p", data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT), + Op.create(path + "/q", data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL) + )); + } catch (KeeperException.SessionExpiredException e) { + LOG.info("SESSION EXPIRED", e); + } + LOG.info("TESTING RESULTS"); + Assert.assertNotNull(multi); + Assert.assertEquals(4, multi.size()); + Assert.assertEquals(data, new String(zk.getData(path + "/e", false, null))); + Assert.assertEquals(data, new String(zk.getData(path + "/p", false, null))); + Assert.assertEquals(data, new String(zk.getData(path + "/q", false, null))); + } + + private void waitForClient(ZooKeeper zk, ZooKeeper.States state) throws InterruptedException { + for (int i = 0; i < ClientBase.CONNECTION_TIMEOUT / 1000; i++) { + if (state.equals(zk.getState())) { + return; + } + Thread.sleep(1000); + } + ClientBase.logAllStackTraces(); + throw new RuntimeException("Waiting too long for state " + state.name()); + } +} From 7f2fd37a2f9c755f107e537075398599849bf0c3 Mon Sep 17 00:00:00 2001 From: Brian Nixon Date: Mon, 20 Mar 2017 11:39:01 -0700 Subject: [PATCH 2/3] add direct testing of checkUpgradeSession in addition to integration style test --- .../server/quorum/QuorumZooKeeperServer.java | 1 + .../server/MultiOpSessionUpgradeTest.java | 72 +++++++++++++++---- 2 files changed, 61 insertions(+), 12 deletions(-) diff --git a/src/java/main/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java b/src/java/main/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java index 4ff3b9ab0ee..f6e4f1135b5 100644 --- a/src/java/main/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java +++ b/src/java/main/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java @@ -81,6 +81,7 @@ public Request checkUpgradeSession(Request request) CreateMode createMode = CreateMode.fromFlag(createRequest.getFlags()); if (createMode.isEphemeral()) { containsEphemeralCreate = true; + break; } } } diff --git a/src/java/test/org/apache/zookeeper/server/MultiOpSessionUpgradeTest.java b/src/java/test/org/apache/zookeeper/server/MultiOpSessionUpgradeTest.java index b33ee4bc6ab..71adfe4f0b3 100644 --- a/src/java/test/org/apache/zookeeper/server/MultiOpSessionUpgradeTest.java +++ b/src/java/test/org/apache/zookeeper/server/MultiOpSessionUpgradeTest.java @@ -18,20 +18,28 @@ package org.apache.zookeeper.server; +import org.apache.jute.BinaryOutputArchive; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.Op; import org.apache.zookeeper.OpResult; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; -import org.apache.zookeeper.test.ClientBase; +import org.apache.zookeeper.proto.CreateRequest; +import org.apache.zookeeper.proto.GetDataRequest; +import org.apache.zookeeper.server.quorum.QuorumPeer; +import org.apache.zookeeper.server.quorum.QuorumZooKeeperServer; +import org.apache.zookeeper.server.quorum.UpgradeableSessionTracker; import org.apache.zookeeper.test.QuorumBase; import org.junit.Assert; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -48,14 +56,17 @@ public void setUp() throws Exception { @Test public void ephemeralCreateMultiOpTest() throws KeeperException, InterruptedException, IOException { final ZooKeeper zk = createClient(); - waitForClient(zk, ZooKeeper.States.CONNECTED); String data = "test"; String path = "/ephemeralcreatemultiop"; zk.create(path, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + QuorumZooKeeperServer server = getConnectedServer(zk.getSessionId()); + Assert.assertNotNull("unable to find server interlocutor", server); + UpgradeableSessionTracker sessionTracker = (UpgradeableSessionTracker)server.getSessionTracker(); + Assert.assertFalse("session already global", sessionTracker.isGlobalSession(zk.getSessionId())); + List multi = null; - LOG.info("RUNNING MULTI-OP"); try { multi = zk.multi(Arrays.asList( Op.setData(path, data.getBytes(), 0), @@ -64,24 +75,61 @@ public void ephemeralCreateMultiOpTest() throws KeeperException, InterruptedExce Op.create(path + "/q", data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL) )); } catch (KeeperException.SessionExpiredException e) { - LOG.info("SESSION EXPIRED", e); + // the scenario that inspired this unit test + Assert.fail("received session expired for a session promotion in a multi-op"); } - LOG.info("TESTING RESULTS"); + Assert.assertNotNull(multi); Assert.assertEquals(4, multi.size()); Assert.assertEquals(data, new String(zk.getData(path + "/e", false, null))); Assert.assertEquals(data, new String(zk.getData(path + "/p", false, null))); Assert.assertEquals(data, new String(zk.getData(path + "/q", false, null))); + Assert.assertTrue("session not promoted", sessionTracker.isGlobalSession(zk.getSessionId())); + } + + @Test + public void directCheckUpgradeSessionTest() throws IOException, InterruptedException, KeeperException { + final ZooKeeper zk = createClient(); + + String path = "/directcheckupgradesession"; + zk.create(path, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + + QuorumZooKeeperServer server = getConnectedServer(zk.getSessionId()); + Assert.assertNotNull("unable to find server interlocutor", server); + + Request readRequest = makeGetDataRequest(path, zk.getSessionId()); + Request createRequest = makeCreateRequest(path + "/e", zk.getSessionId()); + Assert.assertNull("tried to upgrade on a read", server.checkUpgradeSession(readRequest)); + Assert.assertNotNull("failed to upgrade on a create", server.checkUpgradeSession(createRequest)); + Assert.assertNull("tried to upgrade after successful promotion", + server.checkUpgradeSession(createRequest)); + } + + private Request makeGetDataRequest(String path, long sessionId) throws IOException { + ByteArrayOutputStream boas = new ByteArrayOutputStream(); + BinaryOutputArchive boa = BinaryOutputArchive.getArchive(boas); + GetDataRequest getDataRequest = new GetDataRequest(path, false); + getDataRequest.serialize(boa, "request"); + ByteBuffer bb = ByteBuffer.wrap(boas.toByteArray()); + return new Request(null, sessionId, 1, ZooDefs.OpCode.getData, bb, new ArrayList<>()); + } + + private Request makeCreateRequest(String path, long sessionId) throws IOException { + ByteArrayOutputStream boas = new ByteArrayOutputStream(); + BinaryOutputArchive boa = BinaryOutputArchive.getArchive(boas); + CreateRequest createRequest = new CreateRequest(path, + "data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL.toFlag()); + createRequest.serialize(boa, "request"); + ByteBuffer bb = ByteBuffer.wrap(boas.toByteArray()); + return new Request(null, sessionId, 1, ZooDefs.OpCode.create2, bb, new ArrayList<>()); } - private void waitForClient(ZooKeeper zk, ZooKeeper.States state) throws InterruptedException { - for (int i = 0; i < ClientBase.CONNECTION_TIMEOUT / 1000; i++) { - if (state.equals(zk.getState())) { - return; + private QuorumZooKeeperServer getConnectedServer(long sessionId) { + for (QuorumPeer peer : getPeerList()) { + if (peer.getActiveServer().getSessionTracker().isTrackingSession(sessionId)) { + return (QuorumZooKeeperServer)peer.getActiveServer(); } - Thread.sleep(1000); } - ClientBase.logAllStackTraces(); - throw new RuntimeException("Waiting too long for state " + state.name()); + return null; } } From 0a32b9ae8c5c7421039f53f5d36b10c32abebb7a Mon Sep 17 00:00:00 2001 From: Brian Nixon Date: Tue, 21 Mar 2017 08:46:14 -0700 Subject: [PATCH 3/3] fill diamond operators --- .../apache/zookeeper/server/MultiOpSessionUpgradeTest.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/java/test/org/apache/zookeeper/server/MultiOpSessionUpgradeTest.java b/src/java/test/org/apache/zookeeper/server/MultiOpSessionUpgradeTest.java index 71adfe4f0b3..0426e2293ef 100644 --- a/src/java/test/org/apache/zookeeper/server/MultiOpSessionUpgradeTest.java +++ b/src/java/test/org/apache/zookeeper/server/MultiOpSessionUpgradeTest.java @@ -25,6 +25,7 @@ import org.apache.zookeeper.OpResult; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.data.Id; import org.apache.zookeeper.proto.CreateRequest; import org.apache.zookeeper.proto.GetDataRequest; import org.apache.zookeeper.server.quorum.QuorumPeer; @@ -111,7 +112,7 @@ private Request makeGetDataRequest(String path, long sessionId) throws IOExcepti GetDataRequest getDataRequest = new GetDataRequest(path, false); getDataRequest.serialize(boa, "request"); ByteBuffer bb = ByteBuffer.wrap(boas.toByteArray()); - return new Request(null, sessionId, 1, ZooDefs.OpCode.getData, bb, new ArrayList<>()); + return new Request(null, sessionId, 1, ZooDefs.OpCode.getData, bb, new ArrayList()); } private Request makeCreateRequest(String path, long sessionId) throws IOException { @@ -121,7 +122,7 @@ private Request makeCreateRequest(String path, long sessionId) throws IOExceptio "data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL.toFlag()); createRequest.serialize(boa, "request"); ByteBuffer bb = ByteBuffer.wrap(boas.toByteArray()); - return new Request(null, sessionId, 1, ZooDefs.OpCode.create2, bb, new ArrayList<>()); + return new Request(null, sessionId, 1, ZooDefs.OpCode.create2, bb, new ArrayList()); } private QuorumZooKeeperServer getConnectedServer(long sessionId) {