From 55d8ee643c0666fda2a5a2cf587c85f2ed3e2e58 Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Wed, 1 Jul 2020 09:13:28 +0200 Subject: [PATCH 01/12] Update dependencies --- blazingcache-core/pom.xml | 10 +++++++--- .../jcache/StandardKeySerializer.java | 6 ++++-- blazingcache-services/.DS_Store | Bin 0 -> 6148 bytes pom.xml | 14 +++++++------- 4 files changed, 18 insertions(+), 12 deletions(-) create mode 100644 blazingcache-services/.DS_Store diff --git a/blazingcache-core/pom.xml b/blazingcache-core/pom.xml index f6d2ac5..06e30e7 100644 --- a/blazingcache-core/pom.xml +++ b/blazingcache-core/pom.xml @@ -26,7 +26,7 @@ org.apache.felix maven-bundle-plugin - 3.2.0 + 4.2.1 true true @@ -70,6 +70,10 @@ jms javax.jms + + * + io.netty + jmxtools com.sun.jdmk @@ -88,7 +92,7 @@ org.apache.hadoop hadoop-minikdc - 3.1.1 + 3.2.1 test @@ -100,7 +104,7 @@ commons-io commons-io - 2.4 + 2.7 test diff --git a/blazingcache-jcache/src/main/java/blazingcache/jcache/StandardKeySerializer.java b/blazingcache-jcache/src/main/java/blazingcache/jcache/StandardKeySerializer.java index c61d5ab..b24a5c1 100644 --- a/blazingcache-jcache/src/main/java/blazingcache/jcache/StandardKeySerializer.java +++ b/blazingcache-jcache/src/main/java/blazingcache/jcache/StandardKeySerializer.java @@ -21,8 +21,10 @@ import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.util.Base64; +import java.util.Collections; +import java.util.Map; +import java.util.WeakHashMap; import javax.cache.CacheException; -import org.jboss.netty.util.internal.ConcurrentWeakKeyHashMap; /** * Standard keys serializer @@ -31,7 +33,7 @@ */ public class StandardKeySerializer implements Serializer { - private final ConcurrentWeakKeyHashMap notSerializableKeys = new ConcurrentWeakKeyHashMap<>(); + private final Map notSerializableKeys = Collections.synchronizedMap(new WeakHashMap<>()); @Override public String serialize(Object value) { diff --git a/blazingcache-services/.DS_Store b/blazingcache-services/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..4b4c8ff677e7e0b9ea4593af39ee44411877e767 GIT binary patch literal 6148 zcmeHK%}V4z5Uw6aO?E&c>_K4<4SVo1!ibJu!>sOUVa3RL*wry1!3l&!GCj^=7zjRq z51`lmd5~S-!6(`CR(1CX!>mU|r3$LQN_ACr@+EYo0YG#H@dkhg01hf)&BftALi40E zQnMaHp=%_dqGA6$%8FUTF-8 1.8 1.8 - 4.1.20.Final - 2.0.7.Final + 4.1.50.Final + 2.0.31.Final 3.1.0 4.12 - 1.9.11 - 3.4.8 - 2.8.0 + 1.9.13 + 3.6.1 + 5.0.0 1.0 3.1.0 - 3.1.8 - 0.8.2 + 4.0.4 + 0.8.5 From ec4d4cd3e1ce553f51613274b38a4d2d6b28df15 Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Wed, 1 Jul 2020 10:06:58 +0200 Subject: [PATCH 02/12] Fix --- blazingcache-core/pom.xml | 18 ++++++++- blazingcache-services/pom.xml | 39 +++++++++++++++++++ .../src/main/resources/bin/setenv.sh | 2 +- pom.xml | 6 ++- 4 files changed, 61 insertions(+), 4 deletions(-) diff --git a/blazingcache-core/pom.xml b/blazingcache-core/pom.xml index 06e30e7..a1a4393 100644 --- a/blazingcache-core/pom.xml +++ b/blazingcache-core/pom.xml @@ -88,11 +88,25 @@ + + + io.dropwizard.metrics + metrics-core + ${libs.metrics} + test + + + + org.xerial.snappy + snappy-java + ${libs.snappy} + test + org.apache.hadoop hadoop-minikdc - 3.2.1 + ${libs.minikdc} test @@ -104,7 +118,7 @@ commons-io commons-io - 2.7 + ${libs.commonsio} test diff --git a/blazingcache-services/pom.xml b/blazingcache-services/pom.xml index 8a6c763..48ffb26 100644 --- a/blazingcache-services/pom.xml +++ b/blazingcache-services/pom.xml @@ -72,6 +72,45 @@ blazingcache-core ${project.version} + + org.apache.zookeeper + zookeeper + ${libs.zookeeper} + + + jms + javax.jms + + + * + io.netty + + + jmxtools + com.sun.jdmk + + + jmxri + com.sun.jmx + + + slf4j-log4j12 + org.slf4j + + + + + + io.dropwizard.metrics + metrics-core + ${libs.metrics} + + + + org.xerial.snappy + snappy-java + ${libs.snappy} + junit junit diff --git a/blazingcache-services/src/main/resources/bin/setenv.sh b/blazingcache-services/src/main/resources/bin/setenv.sh index 2390a06..d1edf25 100644 --- a/blazingcache-services/src/main/resources/bin/setenv.sh +++ b/blazingcache-services/src/main/resources/bin/setenv.sh @@ -1,7 +1,7 @@ # Basic Environment and Java variables #JAVA_HOME= -JAVA_OPTS="-Xmx1g -Xms1g -XX:+UseParallelGC -XX:+AggressiveOpts -XX:+UseFastAccessorMethods -Djava.net.preferIPv4Stack=true -XX:MaxDirectMemorySize=1g " +JAVA_OPTS="-Xmx1g -Xms1g -Djava.net.preferIPv4Stack=true -XX:MaxDirectMemorySize=1g " if [ -z "$JAVA_HOME" ]; then diff --git a/pom.xml b/pom.xml index 3f3192f..2a150c6 100644 --- a/pom.xml +++ b/pom.xml @@ -71,12 +71,16 @@ 4.12 1.9.13 3.6.1 + 1.1.7.6 + 4.1.9 + 2.7 + 3.2.1 5.0.0 1.0 3.1.0 4.0.4 0.8.5 - + net.jcip From d5b601f03fde5144419557c9b08eb5548b118638 Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Wed, 1 Jul 2020 10:17:48 +0200 Subject: [PATCH 03/12] clean up zk tests --- .../server/SimpleZKSecureTest.java | 168 ++---------------- .../blazingcache/server/SimpleZKTest.java | 155 ++-------------- 2 files changed, 37 insertions(+), 286 deletions(-) diff --git a/blazingcache-core/src/test/java/blazingcache/server/SimpleZKSecureTest.java b/blazingcache-core/src/test/java/blazingcache/server/SimpleZKSecureTest.java index 1bebc45..5d322eb 100644 --- a/blazingcache-core/src/test/java/blazingcache/server/SimpleZKSecureTest.java +++ b/blazingcache-core/src/test/java/blazingcache/server/SimpleZKSecureTest.java @@ -56,13 +56,12 @@ public static void clearUpJaas() { public void basicTest() throws Exception { byte[] data = "testdata".getBytes(StandardCharsets.UTF_8); ServerHostData hostData = new ServerHostData("localhost", 1234, "ciao", false, null); - try (ZKTestEnv zkEnv = new ZKTestEnv(folderZk.getRoot().toPath()); - CacheServer cacheServer = new CacheServer("ciao", hostData)) { + try ( ZKTestEnv zkEnv = new ZKTestEnv(folderZk.getRoot().toPath()); CacheServer cacheServer = new CacheServer("ciao", hostData)) { cacheServer.setupCluster(zkEnv.getAddress(), zkEnv.getTimeout(), zkEnv.getPath(), hostData, true); cacheServer.start(); - try (CacheClient client1 = new CacheClient("theClient1", "ciao", new ZKCacheServerLocator(zkEnv.getAddress(), zkEnv.getTimeout(), zkEnv.getPath())); - CacheClient client2 = new CacheClient("theClient2", "ciao", new ZKCacheServerLocator(zkEnv.getAddress(), zkEnv.getTimeout(), zkEnv.getPath()))) { + try ( CacheClient client1 = new CacheClient("theClient1", "ciao", new ZKCacheServerLocator(zkEnv.getAddress(), zkEnv.getTimeout(), zkEnv.getPath())); CacheClient client2 = + new CacheClient("theClient2", "ciao", new ZKCacheServerLocator(zkEnv.getAddress(), zkEnv.getTimeout(), zkEnv.getPath()))) { client1.start(); client2.start(); @@ -89,13 +88,12 @@ public void basicTest() throws Exception { public void sessionExpirationTest_SingleCacheServer() throws Exception { byte[] data = "testdata".getBytes(StandardCharsets.UTF_8); ServerHostData hostData = new ServerHostData("localhost", 1234, "ciao", false, null); - try (ZKTestEnv zkEnv = new ZKTestEnv(folderZk.getRoot().toPath()); - CacheServer cacheServer = new CacheServer("ciao", hostData)) { + try ( ZKTestEnv zkEnv = new ZKTestEnv(folderZk.getRoot().toPath()); CacheServer cacheServer = new CacheServer("ciao", hostData)) { cacheServer.setupCluster(zkEnv.getAddress(), zkEnv.getTimeout(), zkEnv.getPath(), hostData, true); cacheServer.start(); - try (CacheClient client1 = new CacheClient("theClient1", "ciao", new ZKCacheServerLocator(zkEnv.getAddress(), zkEnv.getTimeout(), zkEnv.getPath())); - CacheClient client2 = new CacheClient("theClient2", "ciao", new ZKCacheServerLocator(zkEnv.getAddress(), zkEnv.getTimeout(), zkEnv.getPath()))) { + try ( CacheClient client1 = new CacheClient("theClient1", "ciao", new ZKCacheServerLocator(zkEnv.getAddress(), zkEnv.getTimeout(), zkEnv.getPath())); CacheClient client2 = + new CacheClient("theClient2", "ciao", new ZKCacheServerLocator(zkEnv.getAddress(), zkEnv.getTimeout(), zkEnv.getPath()))) { client1.start(); client2.start(); @@ -115,24 +113,14 @@ public void sessionExpirationTest_SingleCacheServer() throws Exception { /* * Make's ZooKeeper's session expire: * - * this is the session id and password to use on a second zookeeper - * handle so as to make service monitor's handle to expire + * this is the session id and password to use on a second zookeeper handle so as to make service monitor's handle to expire */ - final long serviceZKSessionId = cacheServer.getZooKeeper().getSessionId(); - final byte[] serviceZKpasswd = cacheServer.getZooKeeper().getSessionPasswd(); - - CountdownWatcher watch2 = new CountdownWatcher("zkexpire"); - // make session on cache server's cluster manager zk handle expire - final ZooKeeper zk = new ZooKeeper(zkEnv.getAddress(), zkEnv.getTimeout(), watch2, - serviceZKSessionId, serviceZKpasswd); - watch2.waitForConnected(10000); - zk.close(); + cacheServer.getZooKeeper().getTestable().injectSessionExpiration(); //first things first, make sure leadership is lost: state change ts has changed waitForCondition(() -> { return cacheServer.getStateChangeTimestamp() > lastStateChangeTS; }, 100); - //when fake zk handle expires we are sure that origina cache server session is going to expire - watch2.waitForExpired(10000); + //first things first, make sure leadership is acquired again waitForCondition(() -> { return cacheServer.isLeader(); @@ -152,12 +140,11 @@ public void sessionExpirationTest_BackupServer() throws Exception { byte[] data = "testdata".getBytes(StandardCharsets.UTF_8); final ServerHostData leaderHostdata = new ServerHostData("localhost", 1234, "leader", false, null); final ServerHostData backupHostdata = new ServerHostData("localhost", 1235, "backup", false, null); - try (ZKTestEnv zkEnv = new ZKTestEnv(folderZk.getRoot().toPath()); - CacheServer cacheServer = new CacheServer("ciao", leaderHostdata); - CacheServer cacheServerBk = new CacheServer("ciao", backupHostdata)) { + try ( ZKTestEnv zkEnv = new ZKTestEnv(folderZk.getRoot().toPath()); CacheServer cacheServer = new CacheServer("ciao", leaderHostdata); CacheServer cacheServerBk = new CacheServer("ciao", + backupHostdata)) { cacheServer.setupCluster(zkEnv.getAddress(), zkEnv.getTimeout(), - zkEnv.getPath(), leaderHostdata, true); + zkEnv.getPath(), leaderHostdata, true); cacheServer.start(); waitForCondition(() -> { return cacheServer.isLeader(); @@ -165,11 +152,11 @@ public void sessionExpirationTest_BackupServer() throws Exception { //start backupcluster: we are sure this is in backup mode cacheServerBk.setupCluster(zkEnv.getAddress(), zkEnv.getTimeout(), - zkEnv.getPath(), backupHostdata, true); + zkEnv.getPath(), backupHostdata, true); cacheServerBk.start(); - try (CacheClient client1 = new CacheClient("theClient1", "ciao", new ZKCacheServerLocator(zkEnv.getAddress(), zkEnv.getTimeout(), zkEnv.getPath())); - CacheClient client2 = new CacheClient("theClient2", "ciao", new ZKCacheServerLocator(zkEnv.getAddress(), zkEnv.getTimeout(), zkEnv.getPath()))) { + try ( CacheClient client1 = new CacheClient("theClient1", "ciao", new ZKCacheServerLocator(zkEnv.getAddress(), zkEnv.getTimeout(), zkEnv.getPath())); CacheClient client2 = + new CacheClient("theClient2", "ciao", new ZKCacheServerLocator(zkEnv.getAddress(), zkEnv.getTimeout(), zkEnv.getPath()))) { client1.start(); client2.start(); @@ -186,27 +173,14 @@ public void sessionExpirationTest_BackupServer() throws Exception { assertEquals(1, client2.getCacheSize()); final long lastStateChangeTS = cacheServer.getStateChangeTimestamp(); - /* - * Make's ZooKeeper's session expire: - * - * this is the session id and password to use on a second zookeeper - * handle so as to make service monitor's handle to expire - */ - final long serviceZKSessionId = cacheServer.getZooKeeper().getSessionId(); - final byte[] serviceZKpasswd = cacheServer.getZooKeeper().getSessionPasswd(); - - CountdownWatcher watch2 = new CountdownWatcher("zkexpire"); - // make session on cache server's cluster manager zk handle expire - final ZooKeeper zk = new ZooKeeper(zkEnv.getAddress(), zkEnv.getTimeout(), watch2, - serviceZKSessionId, serviceZKpasswd); - watch2.waitForConnected(10000); - zk.close(); + + cacheServer.getZooKeeper().getTestable().injectSessionExpiration(); + //first things first, make sure leadership is lost: state change ts has changed waitForCondition(() -> { return cacheServer.getStateChangeTimestamp() > lastStateChangeTS; }, 100); - //when fake zk handle expires we are sure that original cache server session is going to expire - watch2.waitForExpired(10000); + //first things first, make sure leadership is acquired again waitForCondition(() -> { return client1.getCacheSize() == 0 && client2.getCacheSize() == 0; @@ -254,117 +228,13 @@ public static void waitForCondition(Callable condition, int seconds) th Thread.sleep(100); } } catch (InterruptedException ee) { - printStackTrace(ee); Assert.fail("test interrupted!"); return; } catch (Exception ee) { - printStackTrace(ee); Assert.fail("error while evalutaing condition:" + ee); return; } Assert.fail("condition not met in time!"); } - public static void printStackTrace(Throwable t) { - t.printStackTrace(); - } - - private void waitForLeadershipState(final CacheServer server, final boolean leaderState) { - - } - - private static class CountdownWatcher implements Watcher { - - protected static final Logger LOG - = Logger.getLogger("" + CountdownWatcher.class); - - private final String name; - private CountDownLatch clientConnected; - private KeeperState state; - private boolean connected; - private boolean expired; - - public CountdownWatcher(String name) { - this.name = name; - reset(); - } - - private synchronized void reset() { - clientConnected = new CountDownLatch(1); - state = KeeperState.Disconnected; - connected = false; - expired = false; - } - - public synchronized void process(WatchedEvent event) { - LOG.info("Watcher " + name + " got event " + event); - - state = event.getState(); - if (state == KeeperState.SyncConnected) { - connected = true; - clientConnected.countDown(); - } else if (state == KeeperState.SaslAuthenticated) { - } else { - connected = false; - } - if (state == KeeperState.Expired) { - expired = true; - } else if (state == KeeperState.SaslAuthenticated) { - } else { - expired = false; - } - notifyAll(); - } - - public synchronized boolean isConnected() { - return connected; - } - - public synchronized KeeperState state() { - return state; - } - - public synchronized void waitForConnected(long timeout) - throws InterruptedException, TimeoutException { - long expire = System.currentTimeMillis() + timeout; - long left = timeout; - while (!connected && left > 0) { - wait(left); - left = expire - System.currentTimeMillis(); - } - if (!connected) { - throw new TimeoutException("Did not connect"); - - } - } - - public synchronized void waitForDisconnected(long timeout) - throws InterruptedException, TimeoutException { - long expire = System.currentTimeMillis() + timeout; - long left = timeout; - while (connected && left > 0) { - wait(left); - left = expire - System.currentTimeMillis(); - } - if (connected) { - throw new TimeoutException("Did not disconnect"); - - } - } - - public synchronized void waitForExpired(long timeout) - throws InterruptedException, TimeoutException { - long expire = System.currentTimeMillis() + timeout; - long left = timeout; - while (!expired && left > 0) { - wait(left); - left = expire - System.currentTimeMillis(); - } - if (!connected) { - throw new TimeoutException("Did not disconnect"); - - } - } - } - } diff --git a/blazingcache-core/src/test/java/blazingcache/server/SimpleZKTest.java b/blazingcache-core/src/test/java/blazingcache/server/SimpleZKTest.java index 560c7da..caa4ca4 100644 --- a/blazingcache-core/src/test/java/blazingcache/server/SimpleZKTest.java +++ b/blazingcache-core/src/test/java/blazingcache/server/SimpleZKTest.java @@ -43,13 +43,12 @@ public class SimpleZKTest { public void basicTest() throws Exception { byte[] data = "testdata".getBytes(StandardCharsets.UTF_8); ServerHostData hostData = new ServerHostData("localhost", 1234, "ciao", false, null); - try (ZKTestEnv zkEnv = new ZKTestEnv(folderZk.getRoot().toPath()); - CacheServer cacheServer = new CacheServer("ciao", hostData)) { + try ( ZKTestEnv zkEnv = new ZKTestEnv(folderZk.getRoot().toPath()); CacheServer cacheServer = new CacheServer("ciao", hostData)) { cacheServer.setupCluster(zkEnv.getAddress(), zkEnv.getTimeout(), zkEnv.getPath(), hostData, false); cacheServer.start(); - try (CacheClient client1 = new CacheClient("theClient1", "ciao", new ZKCacheServerLocator(zkEnv.getAddress(), zkEnv.getTimeout(), zkEnv.getPath())); - CacheClient client2 = new CacheClient("theClient2", "ciao", new ZKCacheServerLocator(zkEnv.getAddress(), zkEnv.getTimeout(), zkEnv.getPath()))) { + try ( CacheClient client1 = new CacheClient("theClient1", "ciao", new ZKCacheServerLocator(zkEnv.getAddress(), zkEnv.getTimeout(), zkEnv.getPath())); CacheClient client2 = + new CacheClient("theClient2", "ciao", new ZKCacheServerLocator(zkEnv.getAddress(), zkEnv.getTimeout(), zkEnv.getPath()))) { client1.start(); client2.start(); @@ -76,13 +75,12 @@ public void basicTest() throws Exception { public void sessionExpirationTest_SingleCacheServer() throws Exception { byte[] data = "testdata".getBytes(StandardCharsets.UTF_8); ServerHostData hostData = new ServerHostData("localhost", 1234, "ciao", false, null); - try (ZKTestEnv zkEnv = new ZKTestEnv(folderZk.getRoot().toPath()); - CacheServer cacheServer = new CacheServer("ciao", hostData)) { + try ( ZKTestEnv zkEnv = new ZKTestEnv(folderZk.getRoot().toPath()); CacheServer cacheServer = new CacheServer("ciao", hostData)) { cacheServer.setupCluster(zkEnv.getAddress(), zkEnv.getTimeout(), zkEnv.getPath(), hostData, false); cacheServer.start(); - try (CacheClient client1 = new CacheClient("theClient1", "ciao", new ZKCacheServerLocator(zkEnv.getAddress(), zkEnv.getTimeout(), zkEnv.getPath())); - CacheClient client2 = new CacheClient("theClient2", "ciao", new ZKCacheServerLocator(zkEnv.getAddress(), zkEnv.getTimeout(), zkEnv.getPath()))) { + try ( CacheClient client1 = new CacheClient("theClient1", "ciao", new ZKCacheServerLocator(zkEnv.getAddress(), zkEnv.getTimeout(), zkEnv.getPath())); CacheClient client2 = + new CacheClient("theClient2", "ciao", new ZKCacheServerLocator(zkEnv.getAddress(), zkEnv.getTimeout(), zkEnv.getPath()))) { client1.start(); client2.start(); @@ -102,24 +100,16 @@ public void sessionExpirationTest_SingleCacheServer() throws Exception { /* * Make's ZooKeeper's session expire: * - * this is the session id and password to use on a second zookeeper - * handle so as to make service monitor's handle to expire + * this is the session id and password to use on a second zookeeper handle so as to make service monitor's handle to expire */ - final long serviceZKSessionId = cacheServer.getZooKeeper().getSessionId(); - final byte[] serviceZKpasswd = cacheServer.getZooKeeper().getSessionPasswd(); - - CountdownWatcher watch2 = new CountdownWatcher("zkexpire"); - // make session on cache server's cluster manager zk handle expire - final ZooKeeper zk = new ZooKeeper(zkEnv.getAddress(), zkEnv.getTimeout(), watch2, - serviceZKSessionId, serviceZKpasswd); - watch2.waitForConnected(10000); - zk.close(); + + cacheServer.getZooKeeper().getTestable().injectSessionExpiration(); + //first things first, make sure leadership is lost: state change ts has changed waitForCondition(() -> { return cacheServer.getStateChangeTimestamp() > lastStateChangeTS; }, 100); - //when fake zk handle expires we are sure that origina cache server session is going to expire - watch2.waitForExpired(10000); + //first things first, make sure leadership is acquired again waitForCondition(() -> { return cacheServer.isLeader(); @@ -139,9 +129,8 @@ public void sessionExpirationTest_BackupServer() throws Exception { byte[] data = "testdata".getBytes(StandardCharsets.UTF_8); final ServerHostData leaderHostdata = new ServerHostData("localhost", 1234, "leader", false, null); final ServerHostData backupHostdata = new ServerHostData("localhost", 1235, "backup", false, null); - try (ZKTestEnv zkEnv = new ZKTestEnv(folderZk.getRoot().toPath()); - CacheServer cacheServer = new CacheServer("ciao", leaderHostdata); - CacheServer cacheServerBk = new CacheServer("ciao", backupHostdata)) { + try ( ZKTestEnv zkEnv = new ZKTestEnv(folderZk.getRoot().toPath()); CacheServer cacheServer = new CacheServer("ciao", leaderHostdata); CacheServer cacheServerBk = new CacheServer("ciao", + backupHostdata)) { cacheServer.setupCluster(zkEnv.getAddress(), zkEnv.getTimeout(), zkEnv.getPath(), leaderHostdata, false); @@ -155,8 +144,8 @@ public void sessionExpirationTest_BackupServer() throws Exception { zkEnv.getPath(), backupHostdata, false); cacheServerBk.start(); - try (CacheClient client1 = new CacheClient("theClient1", "ciao", new ZKCacheServerLocator(zkEnv.getAddress(), zkEnv.getTimeout(), zkEnv.getPath())); - CacheClient client2 = new CacheClient("theClient2", "ciao", new ZKCacheServerLocator(zkEnv.getAddress(), zkEnv.getTimeout(), zkEnv.getPath()))) { + try ( CacheClient client1 = new CacheClient("theClient1", "ciao", new ZKCacheServerLocator(zkEnv.getAddress(), zkEnv.getTimeout(), zkEnv.getPath())); CacheClient client2 = + new CacheClient("theClient2", "ciao", new ZKCacheServerLocator(zkEnv.getAddress(), zkEnv.getTimeout(), zkEnv.getPath()))) { client1.start(); client2.start(); @@ -176,24 +165,14 @@ public void sessionExpirationTest_BackupServer() throws Exception { /* * Make's ZooKeeper's session expire: * - * this is the session id and password to use on a second zookeeper - * handle so as to make service monitor's handle to expire + * this is the session id and password to use on a second zookeeper handle so as to make service monitor's handle to expire */ - final long serviceZKSessionId = cacheServer.getZooKeeper().getSessionId(); - final byte[] serviceZKpasswd = cacheServer.getZooKeeper().getSessionPasswd(); - - CountdownWatcher watch2 = new CountdownWatcher("zkexpire"); - // make session on cache server's cluster manager zk handle expire - final ZooKeeper zk = new ZooKeeper(zkEnv.getAddress(), zkEnv.getTimeout(), watch2, - serviceZKSessionId, serviceZKpasswd); - watch2.waitForConnected(10000); - zk.close(); + cacheServer.getZooKeeper().getTestable().injectSessionExpiration(); //first things first, make sure leadership is lost: state change ts has changed waitForCondition(() -> { return cacheServer.getStateChangeTimestamp() > lastStateChangeTS; }, 100); - //when fake zk handle expires we are sure that original cache server session is going to expire - watch2.waitForExpired(10000); + //first things first, make sure leadership is acquired again waitForCondition(() -> { return client1.getCacheSize() == 0 && client2.getCacheSize() == 0; @@ -241,111 +220,13 @@ public static void waitForCondition(Callable condition, int seconds) th Thread.sleep(100); } } catch (InterruptedException ee) { - printStackTrace(ee); Assert.fail("test interrupted!"); return; } catch (Exception ee) { - printStackTrace(ee); Assert.fail("error while evalutaing condition:" + ee); return; } Assert.fail("condition not met in time!"); } - public static void printStackTrace(Throwable t) { - t.printStackTrace(); - } - - private void waitForLeadershipState(final CacheServer server, final boolean leaderState) { - - } - - private static class CountdownWatcher implements Watcher { - protected static final Logger LOG = - Logger.getLogger("" + CountdownWatcher.class); - - private final String name; - private CountDownLatch clientConnected; - private KeeperState state; - private boolean connected; - private boolean expired; - - public CountdownWatcher(String name) { - this.name = name; - reset(); - } - private synchronized void reset() { - clientConnected = new CountDownLatch(1); - state = KeeperState.Disconnected; - connected = false; - expired = false; - } - public synchronized void process(WatchedEvent event) { - LOG.info("Watcher " + name + " got event " + event); - - state = event.getState(); - if (state == KeeperState.SyncConnected) { - connected = true; - clientConnected.countDown(); - } else { - connected = false; - } - if (state == KeeperState.Expired) { - expired = true; - } else { - expired = false; - } - notifyAll(); - } - public synchronized boolean isConnected() { - return connected; - } - public synchronized KeeperState state() { - return state; - } - public synchronized void waitForConnected(long timeout) - throws InterruptedException, TimeoutException - { - long expire = System.currentTimeMillis() + timeout; - long left = timeout; - while(!connected && left > 0) { - wait(left); - left = expire - System.currentTimeMillis(); - } - if (!connected) { - throw new TimeoutException("Did not connect"); - - } - } - public synchronized void waitForDisconnected(long timeout) - throws InterruptedException, TimeoutException - { - long expire = System.currentTimeMillis() + timeout; - long left = timeout; - while(connected && left > 0) { - wait(left); - left = expire - System.currentTimeMillis(); - } - if (connected) { - throw new TimeoutException("Did not disconnect"); - - } - } - - public synchronized void waitForExpired(long timeout) - throws InterruptedException, TimeoutException - { - long expire = System.currentTimeMillis() + timeout; - long left = timeout; - while(!expired && left > 0) { - wait(left); - left = expire - System.currentTimeMillis(); - } - if (!connected) { - throw new TimeoutException("Did not disconnect"); - - } - } - } - } From 9d3affa6f95c9247753c62ab685c9a369a840f98 Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Wed, 1 Jul 2020 11:00:37 +0200 Subject: [PATCH 04/12] restore tests --- .../server/SimpleZKSecureTest.java | 168 ++++++++++++++++-- .../blazingcache/server/SimpleZKTest.java | 155 ++++++++++++++-- 2 files changed, 286 insertions(+), 37 deletions(-) diff --git a/blazingcache-core/src/test/java/blazingcache/server/SimpleZKSecureTest.java b/blazingcache-core/src/test/java/blazingcache/server/SimpleZKSecureTest.java index 5d322eb..1bebc45 100644 --- a/blazingcache-core/src/test/java/blazingcache/server/SimpleZKSecureTest.java +++ b/blazingcache-core/src/test/java/blazingcache/server/SimpleZKSecureTest.java @@ -56,12 +56,13 @@ public static void clearUpJaas() { public void basicTest() throws Exception { byte[] data = "testdata".getBytes(StandardCharsets.UTF_8); ServerHostData hostData = new ServerHostData("localhost", 1234, "ciao", false, null); - try ( ZKTestEnv zkEnv = new ZKTestEnv(folderZk.getRoot().toPath()); CacheServer cacheServer = new CacheServer("ciao", hostData)) { + try (ZKTestEnv zkEnv = new ZKTestEnv(folderZk.getRoot().toPath()); + CacheServer cacheServer = new CacheServer("ciao", hostData)) { cacheServer.setupCluster(zkEnv.getAddress(), zkEnv.getTimeout(), zkEnv.getPath(), hostData, true); cacheServer.start(); - try ( CacheClient client1 = new CacheClient("theClient1", "ciao", new ZKCacheServerLocator(zkEnv.getAddress(), zkEnv.getTimeout(), zkEnv.getPath())); CacheClient client2 = - new CacheClient("theClient2", "ciao", new ZKCacheServerLocator(zkEnv.getAddress(), zkEnv.getTimeout(), zkEnv.getPath()))) { + try (CacheClient client1 = new CacheClient("theClient1", "ciao", new ZKCacheServerLocator(zkEnv.getAddress(), zkEnv.getTimeout(), zkEnv.getPath())); + CacheClient client2 = new CacheClient("theClient2", "ciao", new ZKCacheServerLocator(zkEnv.getAddress(), zkEnv.getTimeout(), zkEnv.getPath()))) { client1.start(); client2.start(); @@ -88,12 +89,13 @@ public void basicTest() throws Exception { public void sessionExpirationTest_SingleCacheServer() throws Exception { byte[] data = "testdata".getBytes(StandardCharsets.UTF_8); ServerHostData hostData = new ServerHostData("localhost", 1234, "ciao", false, null); - try ( ZKTestEnv zkEnv = new ZKTestEnv(folderZk.getRoot().toPath()); CacheServer cacheServer = new CacheServer("ciao", hostData)) { + try (ZKTestEnv zkEnv = new ZKTestEnv(folderZk.getRoot().toPath()); + CacheServer cacheServer = new CacheServer("ciao", hostData)) { cacheServer.setupCluster(zkEnv.getAddress(), zkEnv.getTimeout(), zkEnv.getPath(), hostData, true); cacheServer.start(); - try ( CacheClient client1 = new CacheClient("theClient1", "ciao", new ZKCacheServerLocator(zkEnv.getAddress(), zkEnv.getTimeout(), zkEnv.getPath())); CacheClient client2 = - new CacheClient("theClient2", "ciao", new ZKCacheServerLocator(zkEnv.getAddress(), zkEnv.getTimeout(), zkEnv.getPath()))) { + try (CacheClient client1 = new CacheClient("theClient1", "ciao", new ZKCacheServerLocator(zkEnv.getAddress(), zkEnv.getTimeout(), zkEnv.getPath())); + CacheClient client2 = new CacheClient("theClient2", "ciao", new ZKCacheServerLocator(zkEnv.getAddress(), zkEnv.getTimeout(), zkEnv.getPath()))) { client1.start(); client2.start(); @@ -113,14 +115,24 @@ public void sessionExpirationTest_SingleCacheServer() throws Exception { /* * Make's ZooKeeper's session expire: * - * this is the session id and password to use on a second zookeeper handle so as to make service monitor's handle to expire + * this is the session id and password to use on a second zookeeper + * handle so as to make service monitor's handle to expire */ - cacheServer.getZooKeeper().getTestable().injectSessionExpiration(); + final long serviceZKSessionId = cacheServer.getZooKeeper().getSessionId(); + final byte[] serviceZKpasswd = cacheServer.getZooKeeper().getSessionPasswd(); + + CountdownWatcher watch2 = new CountdownWatcher("zkexpire"); + // make session on cache server's cluster manager zk handle expire + final ZooKeeper zk = new ZooKeeper(zkEnv.getAddress(), zkEnv.getTimeout(), watch2, + serviceZKSessionId, serviceZKpasswd); + watch2.waitForConnected(10000); + zk.close(); //first things first, make sure leadership is lost: state change ts has changed waitForCondition(() -> { return cacheServer.getStateChangeTimestamp() > lastStateChangeTS; }, 100); - + //when fake zk handle expires we are sure that origina cache server session is going to expire + watch2.waitForExpired(10000); //first things first, make sure leadership is acquired again waitForCondition(() -> { return cacheServer.isLeader(); @@ -140,11 +152,12 @@ public void sessionExpirationTest_BackupServer() throws Exception { byte[] data = "testdata".getBytes(StandardCharsets.UTF_8); final ServerHostData leaderHostdata = new ServerHostData("localhost", 1234, "leader", false, null); final ServerHostData backupHostdata = new ServerHostData("localhost", 1235, "backup", false, null); - try ( ZKTestEnv zkEnv = new ZKTestEnv(folderZk.getRoot().toPath()); CacheServer cacheServer = new CacheServer("ciao", leaderHostdata); CacheServer cacheServerBk = new CacheServer("ciao", - backupHostdata)) { + try (ZKTestEnv zkEnv = new ZKTestEnv(folderZk.getRoot().toPath()); + CacheServer cacheServer = new CacheServer("ciao", leaderHostdata); + CacheServer cacheServerBk = new CacheServer("ciao", backupHostdata)) { cacheServer.setupCluster(zkEnv.getAddress(), zkEnv.getTimeout(), - zkEnv.getPath(), leaderHostdata, true); + zkEnv.getPath(), leaderHostdata, true); cacheServer.start(); waitForCondition(() -> { return cacheServer.isLeader(); @@ -152,11 +165,11 @@ public void sessionExpirationTest_BackupServer() throws Exception { //start backupcluster: we are sure this is in backup mode cacheServerBk.setupCluster(zkEnv.getAddress(), zkEnv.getTimeout(), - zkEnv.getPath(), backupHostdata, true); + zkEnv.getPath(), backupHostdata, true); cacheServerBk.start(); - try ( CacheClient client1 = new CacheClient("theClient1", "ciao", new ZKCacheServerLocator(zkEnv.getAddress(), zkEnv.getTimeout(), zkEnv.getPath())); CacheClient client2 = - new CacheClient("theClient2", "ciao", new ZKCacheServerLocator(zkEnv.getAddress(), zkEnv.getTimeout(), zkEnv.getPath()))) { + try (CacheClient client1 = new CacheClient("theClient1", "ciao", new ZKCacheServerLocator(zkEnv.getAddress(), zkEnv.getTimeout(), zkEnv.getPath())); + CacheClient client2 = new CacheClient("theClient2", "ciao", new ZKCacheServerLocator(zkEnv.getAddress(), zkEnv.getTimeout(), zkEnv.getPath()))) { client1.start(); client2.start(); @@ -173,14 +186,27 @@ public void sessionExpirationTest_BackupServer() throws Exception { assertEquals(1, client2.getCacheSize()); final long lastStateChangeTS = cacheServer.getStateChangeTimestamp(); - - cacheServer.getZooKeeper().getTestable().injectSessionExpiration(); - + /* + * Make's ZooKeeper's session expire: + * + * this is the session id and password to use on a second zookeeper + * handle so as to make service monitor's handle to expire + */ + final long serviceZKSessionId = cacheServer.getZooKeeper().getSessionId(); + final byte[] serviceZKpasswd = cacheServer.getZooKeeper().getSessionPasswd(); + + CountdownWatcher watch2 = new CountdownWatcher("zkexpire"); + // make session on cache server's cluster manager zk handle expire + final ZooKeeper zk = new ZooKeeper(zkEnv.getAddress(), zkEnv.getTimeout(), watch2, + serviceZKSessionId, serviceZKpasswd); + watch2.waitForConnected(10000); + zk.close(); //first things first, make sure leadership is lost: state change ts has changed waitForCondition(() -> { return cacheServer.getStateChangeTimestamp() > lastStateChangeTS; }, 100); - + //when fake zk handle expires we are sure that original cache server session is going to expire + watch2.waitForExpired(10000); //first things first, make sure leadership is acquired again waitForCondition(() -> { return client1.getCacheSize() == 0 && client2.getCacheSize() == 0; @@ -228,13 +254,117 @@ public static void waitForCondition(Callable condition, int seconds) th Thread.sleep(100); } } catch (InterruptedException ee) { + printStackTrace(ee); Assert.fail("test interrupted!"); return; } catch (Exception ee) { + printStackTrace(ee); Assert.fail("error while evalutaing condition:" + ee); return; } Assert.fail("condition not met in time!"); } + public static void printStackTrace(Throwable t) { + t.printStackTrace(); + } + + private void waitForLeadershipState(final CacheServer server, final boolean leaderState) { + + } + + private static class CountdownWatcher implements Watcher { + + protected static final Logger LOG + = Logger.getLogger("" + CountdownWatcher.class); + + private final String name; + private CountDownLatch clientConnected; + private KeeperState state; + private boolean connected; + private boolean expired; + + public CountdownWatcher(String name) { + this.name = name; + reset(); + } + + private synchronized void reset() { + clientConnected = new CountDownLatch(1); + state = KeeperState.Disconnected; + connected = false; + expired = false; + } + + public synchronized void process(WatchedEvent event) { + LOG.info("Watcher " + name + " got event " + event); + + state = event.getState(); + if (state == KeeperState.SyncConnected) { + connected = true; + clientConnected.countDown(); + } else if (state == KeeperState.SaslAuthenticated) { + } else { + connected = false; + } + if (state == KeeperState.Expired) { + expired = true; + } else if (state == KeeperState.SaslAuthenticated) { + } else { + expired = false; + } + notifyAll(); + } + + public synchronized boolean isConnected() { + return connected; + } + + public synchronized KeeperState state() { + return state; + } + + public synchronized void waitForConnected(long timeout) + throws InterruptedException, TimeoutException { + long expire = System.currentTimeMillis() + timeout; + long left = timeout; + while (!connected && left > 0) { + wait(left); + left = expire - System.currentTimeMillis(); + } + if (!connected) { + throw new TimeoutException("Did not connect"); + + } + } + + public synchronized void waitForDisconnected(long timeout) + throws InterruptedException, TimeoutException { + long expire = System.currentTimeMillis() + timeout; + long left = timeout; + while (connected && left > 0) { + wait(left); + left = expire - System.currentTimeMillis(); + } + if (connected) { + throw new TimeoutException("Did not disconnect"); + + } + } + + public synchronized void waitForExpired(long timeout) + throws InterruptedException, TimeoutException { + long expire = System.currentTimeMillis() + timeout; + long left = timeout; + while (!expired && left > 0) { + wait(left); + left = expire - System.currentTimeMillis(); + } + if (!connected) { + throw new TimeoutException("Did not disconnect"); + + } + } + } + } diff --git a/blazingcache-core/src/test/java/blazingcache/server/SimpleZKTest.java b/blazingcache-core/src/test/java/blazingcache/server/SimpleZKTest.java index caa4ca4..560c7da 100644 --- a/blazingcache-core/src/test/java/blazingcache/server/SimpleZKTest.java +++ b/blazingcache-core/src/test/java/blazingcache/server/SimpleZKTest.java @@ -43,12 +43,13 @@ public class SimpleZKTest { public void basicTest() throws Exception { byte[] data = "testdata".getBytes(StandardCharsets.UTF_8); ServerHostData hostData = new ServerHostData("localhost", 1234, "ciao", false, null); - try ( ZKTestEnv zkEnv = new ZKTestEnv(folderZk.getRoot().toPath()); CacheServer cacheServer = new CacheServer("ciao", hostData)) { + try (ZKTestEnv zkEnv = new ZKTestEnv(folderZk.getRoot().toPath()); + CacheServer cacheServer = new CacheServer("ciao", hostData)) { cacheServer.setupCluster(zkEnv.getAddress(), zkEnv.getTimeout(), zkEnv.getPath(), hostData, false); cacheServer.start(); - try ( CacheClient client1 = new CacheClient("theClient1", "ciao", new ZKCacheServerLocator(zkEnv.getAddress(), zkEnv.getTimeout(), zkEnv.getPath())); CacheClient client2 = - new CacheClient("theClient2", "ciao", new ZKCacheServerLocator(zkEnv.getAddress(), zkEnv.getTimeout(), zkEnv.getPath()))) { + try (CacheClient client1 = new CacheClient("theClient1", "ciao", new ZKCacheServerLocator(zkEnv.getAddress(), zkEnv.getTimeout(), zkEnv.getPath())); + CacheClient client2 = new CacheClient("theClient2", "ciao", new ZKCacheServerLocator(zkEnv.getAddress(), zkEnv.getTimeout(), zkEnv.getPath()))) { client1.start(); client2.start(); @@ -75,12 +76,13 @@ public void basicTest() throws Exception { public void sessionExpirationTest_SingleCacheServer() throws Exception { byte[] data = "testdata".getBytes(StandardCharsets.UTF_8); ServerHostData hostData = new ServerHostData("localhost", 1234, "ciao", false, null); - try ( ZKTestEnv zkEnv = new ZKTestEnv(folderZk.getRoot().toPath()); CacheServer cacheServer = new CacheServer("ciao", hostData)) { + try (ZKTestEnv zkEnv = new ZKTestEnv(folderZk.getRoot().toPath()); + CacheServer cacheServer = new CacheServer("ciao", hostData)) { cacheServer.setupCluster(zkEnv.getAddress(), zkEnv.getTimeout(), zkEnv.getPath(), hostData, false); cacheServer.start(); - try ( CacheClient client1 = new CacheClient("theClient1", "ciao", new ZKCacheServerLocator(zkEnv.getAddress(), zkEnv.getTimeout(), zkEnv.getPath())); CacheClient client2 = - new CacheClient("theClient2", "ciao", new ZKCacheServerLocator(zkEnv.getAddress(), zkEnv.getTimeout(), zkEnv.getPath()))) { + try (CacheClient client1 = new CacheClient("theClient1", "ciao", new ZKCacheServerLocator(zkEnv.getAddress(), zkEnv.getTimeout(), zkEnv.getPath())); + CacheClient client2 = new CacheClient("theClient2", "ciao", new ZKCacheServerLocator(zkEnv.getAddress(), zkEnv.getTimeout(), zkEnv.getPath()))) { client1.start(); client2.start(); @@ -100,16 +102,24 @@ public void sessionExpirationTest_SingleCacheServer() throws Exception { /* * Make's ZooKeeper's session expire: * - * this is the session id and password to use on a second zookeeper handle so as to make service monitor's handle to expire + * this is the session id and password to use on a second zookeeper + * handle so as to make service monitor's handle to expire */ - - cacheServer.getZooKeeper().getTestable().injectSessionExpiration(); - + final long serviceZKSessionId = cacheServer.getZooKeeper().getSessionId(); + final byte[] serviceZKpasswd = cacheServer.getZooKeeper().getSessionPasswd(); + + CountdownWatcher watch2 = new CountdownWatcher("zkexpire"); + // make session on cache server's cluster manager zk handle expire + final ZooKeeper zk = new ZooKeeper(zkEnv.getAddress(), zkEnv.getTimeout(), watch2, + serviceZKSessionId, serviceZKpasswd); + watch2.waitForConnected(10000); + zk.close(); //first things first, make sure leadership is lost: state change ts has changed waitForCondition(() -> { return cacheServer.getStateChangeTimestamp() > lastStateChangeTS; }, 100); - + //when fake zk handle expires we are sure that origina cache server session is going to expire + watch2.waitForExpired(10000); //first things first, make sure leadership is acquired again waitForCondition(() -> { return cacheServer.isLeader(); @@ -129,8 +139,9 @@ public void sessionExpirationTest_BackupServer() throws Exception { byte[] data = "testdata".getBytes(StandardCharsets.UTF_8); final ServerHostData leaderHostdata = new ServerHostData("localhost", 1234, "leader", false, null); final ServerHostData backupHostdata = new ServerHostData("localhost", 1235, "backup", false, null); - try ( ZKTestEnv zkEnv = new ZKTestEnv(folderZk.getRoot().toPath()); CacheServer cacheServer = new CacheServer("ciao", leaderHostdata); CacheServer cacheServerBk = new CacheServer("ciao", - backupHostdata)) { + try (ZKTestEnv zkEnv = new ZKTestEnv(folderZk.getRoot().toPath()); + CacheServer cacheServer = new CacheServer("ciao", leaderHostdata); + CacheServer cacheServerBk = new CacheServer("ciao", backupHostdata)) { cacheServer.setupCluster(zkEnv.getAddress(), zkEnv.getTimeout(), zkEnv.getPath(), leaderHostdata, false); @@ -144,8 +155,8 @@ public void sessionExpirationTest_BackupServer() throws Exception { zkEnv.getPath(), backupHostdata, false); cacheServerBk.start(); - try ( CacheClient client1 = new CacheClient("theClient1", "ciao", new ZKCacheServerLocator(zkEnv.getAddress(), zkEnv.getTimeout(), zkEnv.getPath())); CacheClient client2 = - new CacheClient("theClient2", "ciao", new ZKCacheServerLocator(zkEnv.getAddress(), zkEnv.getTimeout(), zkEnv.getPath()))) { + try (CacheClient client1 = new CacheClient("theClient1", "ciao", new ZKCacheServerLocator(zkEnv.getAddress(), zkEnv.getTimeout(), zkEnv.getPath())); + CacheClient client2 = new CacheClient("theClient2", "ciao", new ZKCacheServerLocator(zkEnv.getAddress(), zkEnv.getTimeout(), zkEnv.getPath()))) { client1.start(); client2.start(); @@ -165,14 +176,24 @@ public void sessionExpirationTest_BackupServer() throws Exception { /* * Make's ZooKeeper's session expire: * - * this is the session id and password to use on a second zookeeper handle so as to make service monitor's handle to expire + * this is the session id and password to use on a second zookeeper + * handle so as to make service monitor's handle to expire */ - cacheServer.getZooKeeper().getTestable().injectSessionExpiration(); + final long serviceZKSessionId = cacheServer.getZooKeeper().getSessionId(); + final byte[] serviceZKpasswd = cacheServer.getZooKeeper().getSessionPasswd(); + + CountdownWatcher watch2 = new CountdownWatcher("zkexpire"); + // make session on cache server's cluster manager zk handle expire + final ZooKeeper zk = new ZooKeeper(zkEnv.getAddress(), zkEnv.getTimeout(), watch2, + serviceZKSessionId, serviceZKpasswd); + watch2.waitForConnected(10000); + zk.close(); //first things first, make sure leadership is lost: state change ts has changed waitForCondition(() -> { return cacheServer.getStateChangeTimestamp() > lastStateChangeTS; }, 100); - + //when fake zk handle expires we are sure that original cache server session is going to expire + watch2.waitForExpired(10000); //first things first, make sure leadership is acquired again waitForCondition(() -> { return client1.getCacheSize() == 0 && client2.getCacheSize() == 0; @@ -220,13 +241,111 @@ public static void waitForCondition(Callable condition, int seconds) th Thread.sleep(100); } } catch (InterruptedException ee) { + printStackTrace(ee); Assert.fail("test interrupted!"); return; } catch (Exception ee) { + printStackTrace(ee); Assert.fail("error while evalutaing condition:" + ee); return; } Assert.fail("condition not met in time!"); } + public static void printStackTrace(Throwable t) { + t.printStackTrace(); + } + + private void waitForLeadershipState(final CacheServer server, final boolean leaderState) { + + } + + private static class CountdownWatcher implements Watcher { + protected static final Logger LOG = + Logger.getLogger("" + CountdownWatcher.class); + + private final String name; + private CountDownLatch clientConnected; + private KeeperState state; + private boolean connected; + private boolean expired; + + public CountdownWatcher(String name) { + this.name = name; + reset(); + } + private synchronized void reset() { + clientConnected = new CountDownLatch(1); + state = KeeperState.Disconnected; + connected = false; + expired = false; + } + public synchronized void process(WatchedEvent event) { + LOG.info("Watcher " + name + " got event " + event); + + state = event.getState(); + if (state == KeeperState.SyncConnected) { + connected = true; + clientConnected.countDown(); + } else { + connected = false; + } + if (state == KeeperState.Expired) { + expired = true; + } else { + expired = false; + } + notifyAll(); + } + public synchronized boolean isConnected() { + return connected; + } + public synchronized KeeperState state() { + return state; + } + public synchronized void waitForConnected(long timeout) + throws InterruptedException, TimeoutException + { + long expire = System.currentTimeMillis() + timeout; + long left = timeout; + while(!connected && left > 0) { + wait(left); + left = expire - System.currentTimeMillis(); + } + if (!connected) { + throw new TimeoutException("Did not connect"); + + } + } + public synchronized void waitForDisconnected(long timeout) + throws InterruptedException, TimeoutException + { + long expire = System.currentTimeMillis() + timeout; + long left = timeout; + while(connected && left > 0) { + wait(left); + left = expire - System.currentTimeMillis(); + } + if (connected) { + throw new TimeoutException("Did not disconnect"); + + } + } + + public synchronized void waitForExpired(long timeout) + throws InterruptedException, TimeoutException + { + long expire = System.currentTimeMillis() + timeout; + long left = timeout; + while(!expired && left > 0) { + wait(left); + left = expire - System.currentTimeMillis(); + } + if (!connected) { + throw new TimeoutException("Did not disconnect"); + + } + } + } + } From 9e893a46c68b373d56d046e9e8ff0ddb552b0073 Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Wed, 1 Jul 2020 11:02:19 +0200 Subject: [PATCH 05/12] upgrade plugins --- pom.xml | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/pom.xml b/pom.xml index 2a150c6..3bbaf6f 100644 --- a/pom.xml +++ b/pom.xml @@ -100,7 +100,7 @@ maven-site-plugin - 3.7.1 + 3.9.1 @@ -160,18 +160,17 @@ org.apache.maven.plugins maven-surefire-plugin - 3.0.0-M3 + 3.0.0-M5 @{argLine} -Xmx1024m -Djava.net.preferIpv4Stack=true 1 false - true org.apache.maven.plugins maven-compiler-plugin - 3.8.0 + 3.8.1 ${project.build.sourceEncoding} @@ -179,7 +178,7 @@ org.apache.maven.plugins maven-resources-plugin - 2.7 + 3.1.0 ${project.build.sourceEncoding} @@ -219,7 +218,7 @@ org.apache.maven.plugins maven-surefire-plugin - 3.0.0-M3 + 3.0.0-M5 -Xmx1024m -Djava.net.preferIpv4Stack=true 1 @@ -229,7 +228,7 @@ org.apache.maven.plugins maven-compiler-plugin - 3.8.0 + 3.8.1 ${project.build.sourceEncoding} @@ -256,7 +255,7 @@ org.apache.maven.plugins maven-source-plugin - 3.0.1 + 3.2.1 attach-sources @@ -270,7 +269,7 @@ org.apache.maven.plugins maven-javadoc-plugin - 3.0.1 + 3.2.0 attach-javadocs From 7b237896d83a79110dfdd15141f662d08cd69144 Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Wed, 1 Jul 2020 13:41:15 +0200 Subject: [PATCH 06/12] more zk clean up --- blazingcache-core/pom.xml | 17 +++ .../security/sasl/SaslNettyClient.java | 2 +- .../server/JVMServersRegistry.java | 2 +- .../TestingZookeeperServerEmbedded.java | 144 ------------------ .../src/test/java/blazingcache/ZKTestEnv.java | 18 ++- .../blazingcache/server/CountdownWatcher.java | 102 +++++++++++++ .../server/SimpleZKSecureTest.java | 98 ------------ .../blazingcache/server/SimpleZKTest.java | 123 ++------------- pom.xml | 3 +- 9 files changed, 149 insertions(+), 360 deletions(-) delete mode 100644 blazingcache-core/src/test/java/blazingcache/TestingZookeeperServerEmbedded.java create mode 100644 blazingcache-core/src/test/java/blazingcache/server/CountdownWatcher.java diff --git a/blazingcache-core/pom.xml b/blazingcache-core/pom.xml index a1a4393..f72eaa5 100644 --- a/blazingcache-core/pom.xml +++ b/blazingcache-core/pom.xml @@ -121,5 +121,22 @@ ${libs.commonsio} test + + org.apache.curator + curator-test + ${libs.curator} + test + + + org.slf4j + slf4j-api + ${libs.slf4j} + + + org.slf4j + slf4j-jdk14 + ${libs.slf4j} + test + diff --git a/blazingcache-core/src/main/java/blazingcache/security/sasl/SaslNettyClient.java b/blazingcache-core/src/main/java/blazingcache/security/sasl/SaslNettyClient.java index 41a4670..4912ff5 100644 --- a/blazingcache-core/src/main/java/blazingcache/security/sasl/SaslNettyClient.java +++ b/blazingcache-core/src/main/java/blazingcache/security/sasl/SaslNettyClient.java @@ -130,7 +130,7 @@ private Subject loginClient() throws SaslException, PrivilegedActionException, L String clientSection = "BlazingCacheClient"; AppConfigurationEntry[] entries = Configuration.getConfiguration().getAppConfigurationEntry(clientSection); if (entries == null) { - LOG.log(Level.SEVERE, "No JAAS Configuration found with section BlazingCacheClient"); + LOG.log(Level.INFO, "No JAAS Configuration found with section BlazingCacheClient"); return null; } try { diff --git a/blazingcache-core/src/main/java/blazingcache/server/JVMServersRegistry.java b/blazingcache-core/src/main/java/blazingcache/server/JVMServersRegistry.java index 219d8a2..942e580 100644 --- a/blazingcache-core/src/main/java/blazingcache/server/JVMServersRegistry.java +++ b/blazingcache-core/src/main/java/blazingcache/server/JVMServersRegistry.java @@ -35,7 +35,7 @@ public class JVMServersRegistry { private static String lastRegisteredServer = ""; public static void registerServer(String id, CacheServer broker) { - LOGGER.log(Level.SEVERE, "registerServer {0}", id); + LOGGER.log(Level.INFO, "registerServer {0}", id); servers.put(id, broker); lastRegisteredServer = id; } diff --git a/blazingcache-core/src/test/java/blazingcache/TestingZookeeperServerEmbedded.java b/blazingcache-core/src/test/java/blazingcache/TestingZookeeperServerEmbedded.java deleted file mode 100644 index 78d0f61..0000000 --- a/blazingcache-core/src/test/java/blazingcache/TestingZookeeperServerEmbedded.java +++ /dev/null @@ -1,144 +0,0 @@ -/* - Licensed to Diennea S.r.l. under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. Diennea S.r.l. licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. - - */ -package blazingcache; - -import java.io.File; -import java.lang.reflect.Field; -import java.util.Properties; -import org.apache.zookeeper.server.ServerCnxnFactory; -import org.apache.zookeeper.server.ServerConfig; -import org.apache.zookeeper.server.ZooKeeperServer; -import org.apache.zookeeper.server.ZooKeeperServerMain; -import org.apache.zookeeper.server.quorum.QuorumPeerConfig; -import org.apache.zookeeper.server.quorum.QuorumPeerMain; - -/** - * Simple ZookKeeper Server, with SASL support - * - * @author enrico.olivelli - */ -public class TestingZookeeperServerEmbedded implements AutoCloseable { - - QuorumPeerConfig config; - QuorumPeerMain maincluster; - ZooKeeperServerMain mainsingle; - Thread thread; - ServerCnxnFactory cnxnFactory; - private int clientPort; - - public TestingZookeeperServerEmbedded(int clientPort, File baseDir) throws Exception { - this.clientPort = clientPort; - Properties p = new Properties(); - String host = "localhost"; - String dataDir = "data"; - File dir = new File(baseDir, dataDir); - p.setProperty("syncEnabled", "false"); - p.setProperty("dataDir", dir.getAbsolutePath()); - p.setProperty("clientPort", clientPort + ""); - p.setProperty("authProvider.1", "org.apache.zookeeper.server.auth.SASLAuthenticationProvider"); - p.setProperty("kerberos.removeHostFromPrincipal", "true"); - p.setProperty("kerberos.removeRealmFromPrincipal", "true"); - - config = new QuorumPeerConfig(); - - config.parseProperties(p); - - } - - public String getConnectString() { - return "localhost:" + clientPort; - } - - public void start() throws Exception { - - mainsingle = new ZooKeeperServerMain(); - - thread = new Thread("zkservermainrunner") { - @Override - public void run() { - try { - ServerConfig cc = new ServerConfig(); - cc.readFrom(config); - mainsingle.runFromConfig(cc); - System.out.println("ZK server died"); - } catch (Throwable t) { - t.printStackTrace(); - } - } - }; - thread.start(); - - this.cnxnFactory = getServerConnectionFactory(); - if (cnxnFactory != null) { - final ZooKeeperServer zkServer = getZooKeeperServer(cnxnFactory); - if (zkServer != null) { - synchronized (zkServer) { - if (!zkServer.isRunning()) { - zkServer.wait(); - } - } - } - } - - } - - private ServerCnxnFactory getServerConnectionFactory() throws Exception { - Field cnxnFactoryField = ZooKeeperServerMain.class.getDeclaredField("cnxnFactory"); - cnxnFactoryField.setAccessible(true); - ServerCnxnFactory cnxnFactory; - - // Wait until the cnxnFactory field is non-null or up to 1s, whichever comes first. - long startTime = System.currentTimeMillis(); - do { - cnxnFactory = (ServerCnxnFactory) cnxnFactoryField.get(mainsingle); - } while ((cnxnFactory == null) && ((System.currentTimeMillis() - startTime) < 10000)); - - return cnxnFactory; - } - - private ZooKeeperServer getZooKeeperServer(ServerCnxnFactory cnxnFactory) throws Exception { - Field zkServerField = ServerCnxnFactory.class.getDeclaredField("zkServer"); - zkServerField.setAccessible(true); - ZooKeeperServer zkServer; - - // Wait until the zkServer field is non-null or up to 1s, whichever comes first. - long startTime = System.currentTimeMillis(); - do { - zkServer = (ZooKeeperServer) zkServerField.get(cnxnFactory); - } while ((zkServer == null) && ((System.currentTimeMillis() - startTime) < 10000)); - - return zkServer; - } - - @Override - public void close() { - if (cnxnFactory != null) { - cnxnFactory.shutdown(); - } - - if (thread != null) { - thread.interrupt(); - try { - thread.join(); - } catch (InterruptedException ex) { - } - } - } -} diff --git a/blazingcache-core/src/test/java/blazingcache/ZKTestEnv.java b/blazingcache-core/src/test/java/blazingcache/ZKTestEnv.java index e4a57cb..79b14c9 100644 --- a/blazingcache-core/src/test/java/blazingcache/ZKTestEnv.java +++ b/blazingcache-core/src/test/java/blazingcache/ZKTestEnv.java @@ -1,17 +1,25 @@ package blazingcache; import java.nio.file.Path; +import java.util.HashMap; +import java.util.Map; +import org.apache.curator.test.InstanceSpec; +import org.apache.curator.test.TestingServer; public class ZKTestEnv implements AutoCloseable { - TestingZookeeperServerEmbedded zkServer; - - Path path; + final TestingServer zkServer; public ZKTestEnv(final Path path) throws Exception { - zkServer = new TestingZookeeperServerEmbedded(1281, path.toFile()); + Map customProperties = new HashMap<>(); + customProperties.put("authProvider.1", "org.apache.zookeeper.server.auth.SASLAuthenticationProvider"); + customProperties.put("kerberos.removeHostFromPrincipal", "true"); + customProperties.put("kerberos.removeRealmFromPrincipal", "true"); + customProperties.put("syncEnabled", "false"); + InstanceSpec spec = new InstanceSpec(path.toFile(), 1111, 2222, 2223, false, 1, 1000, 100, + customProperties, "localhost"); + zkServer = new TestingServer(spec, false); zkServer.start(); - this.path = path; } public String getAddress() { diff --git a/blazingcache-core/src/test/java/blazingcache/server/CountdownWatcher.java b/blazingcache-core/src/test/java/blazingcache/server/CountdownWatcher.java new file mode 100644 index 0000000..833f7d5 --- /dev/null +++ b/blazingcache-core/src/test/java/blazingcache/server/CountdownWatcher.java @@ -0,0 +1,102 @@ +/* + * Licensed to Diennea S.r.l. under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Diennea S.r.l. licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package blazingcache.server; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.logging.Logger; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.junit.Assert; + + +public class CountdownWatcher implements Watcher { + + protected static final Logger LOG = Logger.getLogger("" + CountdownWatcher.class); + private final String name; + private CountDownLatch clientConnected; + private Event.KeeperState state; + private boolean connected; + private boolean expired; + + public CountdownWatcher(String name) { + this.name = name; + reset(); + } + + private synchronized void reset() { + clientConnected = new CountDownLatch(1); + state = Event.KeeperState.Disconnected; + connected = false; + expired = false; + } + + public void process(WatchedEvent event) { + LOG.info("Watcher " + name + " got event " + event); + setState(event.getState()); + setConnected(true); + clientConnected.countDown(); + if (event.getState() == Event.KeeperState.Expired) { + expired = true; + } + } + + public synchronized boolean isConnected() { + return connected; + } + + public synchronized boolean isExpired() { + return expired; + } + + public synchronized Event.KeeperState state() { + return state; + } + + public void setState(Event.KeeperState state) { + this.state = state; + } + + public void setConnected(boolean connected) { + this.connected = connected; + } + + public void setExpired(boolean expired) { + this.expired = expired; + } + + public void waitForConnected(long timeout) throws InterruptedException, TimeoutException { + Assert.assertTrue(clientConnected.await(timeout, TimeUnit.MILLISECONDS)); + } + + public void waitForExpired(long timeout) throws InterruptedException, TimeoutException { + long expire = System.currentTimeMillis() + timeout; + long left = timeout; + while (!isExpired() && left > 0) { + Thread.sleep(left); + left = expire - System.currentTimeMillis(); + } + if (!isConnected()) { + throw new TimeoutException("Did not disconnect"); + } + } + +} diff --git a/blazingcache-core/src/test/java/blazingcache/server/SimpleZKSecureTest.java b/blazingcache-core/src/test/java/blazingcache/server/SimpleZKSecureTest.java index 1bebc45..d489e38 100644 --- a/blazingcache-core/src/test/java/blazingcache/server/SimpleZKSecureTest.java +++ b/blazingcache-core/src/test/java/blazingcache/server/SimpleZKSecureTest.java @@ -269,102 +269,4 @@ public static void printStackTrace(Throwable t) { t.printStackTrace(); } - private void waitForLeadershipState(final CacheServer server, final boolean leaderState) { - - } - - private static class CountdownWatcher implements Watcher { - - protected static final Logger LOG - = Logger.getLogger("" + CountdownWatcher.class); - - private final String name; - private CountDownLatch clientConnected; - private KeeperState state; - private boolean connected; - private boolean expired; - - public CountdownWatcher(String name) { - this.name = name; - reset(); - } - - private synchronized void reset() { - clientConnected = new CountDownLatch(1); - state = KeeperState.Disconnected; - connected = false; - expired = false; - } - - public synchronized void process(WatchedEvent event) { - LOG.info("Watcher " + name + " got event " + event); - - state = event.getState(); - if (state == KeeperState.SyncConnected) { - connected = true; - clientConnected.countDown(); - } else if (state == KeeperState.SaslAuthenticated) { - } else { - connected = false; - } - if (state == KeeperState.Expired) { - expired = true; - } else if (state == KeeperState.SaslAuthenticated) { - } else { - expired = false; - } - notifyAll(); - } - - public synchronized boolean isConnected() { - return connected; - } - - public synchronized KeeperState state() { - return state; - } - - public synchronized void waitForConnected(long timeout) - throws InterruptedException, TimeoutException { - long expire = System.currentTimeMillis() + timeout; - long left = timeout; - while (!connected && left > 0) { - wait(left); - left = expire - System.currentTimeMillis(); - } - if (!connected) { - throw new TimeoutException("Did not connect"); - - } - } - - public synchronized void waitForDisconnected(long timeout) - throws InterruptedException, TimeoutException { - long expire = System.currentTimeMillis() + timeout; - long left = timeout; - while (connected && left > 0) { - wait(left); - left = expire - System.currentTimeMillis(); - } - if (connected) { - throw new TimeoutException("Did not disconnect"); - - } - } - - public synchronized void waitForExpired(long timeout) - throws InterruptedException, TimeoutException { - long expire = System.currentTimeMillis() + timeout; - long left = timeout; - while (!expired && left > 0) { - wait(left); - left = expire - System.currentTimeMillis(); - } - if (!connected) { - throw new TimeoutException("Did not disconnect"); - - } - } - } - } diff --git a/blazingcache-core/src/test/java/blazingcache/server/SimpleZKTest.java b/blazingcache-core/src/test/java/blazingcache/server/SimpleZKTest.java index 560c7da..236fa2d 100644 --- a/blazingcache-core/src/test/java/blazingcache/server/SimpleZKTest.java +++ b/blazingcache-core/src/test/java/blazingcache/server/SimpleZKTest.java @@ -27,8 +27,8 @@ import blazingcache.ZKTestEnv; import blazingcache.client.CacheClient; import blazingcache.network.ServerHostData; -import blazingcache.server.CacheServer; import blazingcache.zookeeper.ZKCacheServerLocator; +import java.util.concurrent.TimeUnit; /** * @@ -43,13 +43,12 @@ public class SimpleZKTest { public void basicTest() throws Exception { byte[] data = "testdata".getBytes(StandardCharsets.UTF_8); ServerHostData hostData = new ServerHostData("localhost", 1234, "ciao", false, null); - try (ZKTestEnv zkEnv = new ZKTestEnv(folderZk.getRoot().toPath()); - CacheServer cacheServer = new CacheServer("ciao", hostData)) { + try ( ZKTestEnv zkEnv = new ZKTestEnv(folderZk.getRoot().toPath()); CacheServer cacheServer = new CacheServer("ciao", hostData)) { cacheServer.setupCluster(zkEnv.getAddress(), zkEnv.getTimeout(), zkEnv.getPath(), hostData, false); cacheServer.start(); - try (CacheClient client1 = new CacheClient("theClient1", "ciao", new ZKCacheServerLocator(zkEnv.getAddress(), zkEnv.getTimeout(), zkEnv.getPath())); - CacheClient client2 = new CacheClient("theClient2", "ciao", new ZKCacheServerLocator(zkEnv.getAddress(), zkEnv.getTimeout(), zkEnv.getPath()))) { + try ( CacheClient client1 = new CacheClient("theClient1", "ciao", new ZKCacheServerLocator(zkEnv.getAddress(), zkEnv.getTimeout(), zkEnv.getPath())); CacheClient client2 = + new CacheClient("theClient2", "ciao", new ZKCacheServerLocator(zkEnv.getAddress(), zkEnv.getTimeout(), zkEnv.getPath()))) { client1.start(); client2.start(); @@ -76,13 +75,12 @@ public void basicTest() throws Exception { public void sessionExpirationTest_SingleCacheServer() throws Exception { byte[] data = "testdata".getBytes(StandardCharsets.UTF_8); ServerHostData hostData = new ServerHostData("localhost", 1234, "ciao", false, null); - try (ZKTestEnv zkEnv = new ZKTestEnv(folderZk.getRoot().toPath()); - CacheServer cacheServer = new CacheServer("ciao", hostData)) { + try ( ZKTestEnv zkEnv = new ZKTestEnv(folderZk.getRoot().toPath()); CacheServer cacheServer = new CacheServer("ciao", hostData)) { cacheServer.setupCluster(zkEnv.getAddress(), zkEnv.getTimeout(), zkEnv.getPath(), hostData, false); cacheServer.start(); - try (CacheClient client1 = new CacheClient("theClient1", "ciao", new ZKCacheServerLocator(zkEnv.getAddress(), zkEnv.getTimeout(), zkEnv.getPath())); - CacheClient client2 = new CacheClient("theClient2", "ciao", new ZKCacheServerLocator(zkEnv.getAddress(), zkEnv.getTimeout(), zkEnv.getPath()))) { + try ( CacheClient client1 = new CacheClient("theClient1", "ciao", new ZKCacheServerLocator(zkEnv.getAddress(), zkEnv.getTimeout(), zkEnv.getPath())); CacheClient client2 = + new CacheClient("theClient2", "ciao", new ZKCacheServerLocator(zkEnv.getAddress(), zkEnv.getTimeout(), zkEnv.getPath()))) { client1.start(); client2.start(); @@ -102,8 +100,7 @@ public void sessionExpirationTest_SingleCacheServer() throws Exception { /* * Make's ZooKeeper's session expire: * - * this is the session id and password to use on a second zookeeper - * handle so as to make service monitor's handle to expire + * this is the session id and password to use on a second zookeeper handle so as to make service monitor's handle to expire */ final long serviceZKSessionId = cacheServer.getZooKeeper().getSessionId(); final byte[] serviceZKpasswd = cacheServer.getZooKeeper().getSessionPasswd(); @@ -139,9 +136,8 @@ public void sessionExpirationTest_BackupServer() throws Exception { byte[] data = "testdata".getBytes(StandardCharsets.UTF_8); final ServerHostData leaderHostdata = new ServerHostData("localhost", 1234, "leader", false, null); final ServerHostData backupHostdata = new ServerHostData("localhost", 1235, "backup", false, null); - try (ZKTestEnv zkEnv = new ZKTestEnv(folderZk.getRoot().toPath()); - CacheServer cacheServer = new CacheServer("ciao", leaderHostdata); - CacheServer cacheServerBk = new CacheServer("ciao", backupHostdata)) { + try ( ZKTestEnv zkEnv = new ZKTestEnv(folderZk.getRoot().toPath()); CacheServer cacheServer = new CacheServer("ciao", leaderHostdata); CacheServer cacheServerBk = new CacheServer("ciao", + backupHostdata)) { cacheServer.setupCluster(zkEnv.getAddress(), zkEnv.getTimeout(), zkEnv.getPath(), leaderHostdata, false); @@ -155,8 +151,8 @@ public void sessionExpirationTest_BackupServer() throws Exception { zkEnv.getPath(), backupHostdata, false); cacheServerBk.start(); - try (CacheClient client1 = new CacheClient("theClient1", "ciao", new ZKCacheServerLocator(zkEnv.getAddress(), zkEnv.getTimeout(), zkEnv.getPath())); - CacheClient client2 = new CacheClient("theClient2", "ciao", new ZKCacheServerLocator(zkEnv.getAddress(), zkEnv.getTimeout(), zkEnv.getPath()))) { + try ( CacheClient client1 = new CacheClient("theClient1", "ciao", new ZKCacheServerLocator(zkEnv.getAddress(), zkEnv.getTimeout(), zkEnv.getPath())); CacheClient client2 = + new CacheClient("theClient2", "ciao", new ZKCacheServerLocator(zkEnv.getAddress(), zkEnv.getTimeout(), zkEnv.getPath()))) { client1.start(); client2.start(); @@ -176,8 +172,7 @@ public void sessionExpirationTest_BackupServer() throws Exception { /* * Make's ZooKeeper's session expire: * - * this is the session id and password to use on a second zookeeper - * handle so as to make service monitor's handle to expire + * this is the session id and password to use on a second zookeeper handle so as to make service monitor's handle to expire */ final long serviceZKSessionId = cacheServer.getZooKeeper().getSessionId(); final byte[] serviceZKpasswd = cacheServer.getZooKeeper().getSessionPasswd(); @@ -256,96 +251,4 @@ public static void printStackTrace(Throwable t) { t.printStackTrace(); } - private void waitForLeadershipState(final CacheServer server, final boolean leaderState) { - - } - - private static class CountdownWatcher implements Watcher { - protected static final Logger LOG = - Logger.getLogger("" + CountdownWatcher.class); - - private final String name; - private CountDownLatch clientConnected; - private KeeperState state; - private boolean connected; - private boolean expired; - - public CountdownWatcher(String name) { - this.name = name; - reset(); - } - private synchronized void reset() { - clientConnected = new CountDownLatch(1); - state = KeeperState.Disconnected; - connected = false; - expired = false; - } - public synchronized void process(WatchedEvent event) { - LOG.info("Watcher " + name + " got event " + event); - - state = event.getState(); - if (state == KeeperState.SyncConnected) { - connected = true; - clientConnected.countDown(); - } else { - connected = false; - } - if (state == KeeperState.Expired) { - expired = true; - } else { - expired = false; - } - notifyAll(); - } - public synchronized boolean isConnected() { - return connected; - } - public synchronized KeeperState state() { - return state; - } - public synchronized void waitForConnected(long timeout) - throws InterruptedException, TimeoutException - { - long expire = System.currentTimeMillis() + timeout; - long left = timeout; - while(!connected && left > 0) { - wait(left); - left = expire - System.currentTimeMillis(); - } - if (!connected) { - throw new TimeoutException("Did not connect"); - - } - } - public synchronized void waitForDisconnected(long timeout) - throws InterruptedException, TimeoutException - { - long expire = System.currentTimeMillis() + timeout; - long left = timeout; - while(connected && left > 0) { - wait(left); - left = expire - System.currentTimeMillis(); - } - if (connected) { - throw new TimeoutException("Did not disconnect"); - - } - } - - public synchronized void waitForExpired(long timeout) - throws InterruptedException, TimeoutException - { - long expire = System.currentTimeMillis() + timeout; - long left = timeout; - while(!expired && left > 0) { - wait(left); - left = expire - System.currentTimeMillis(); - } - if (!connected) { - throw new TimeoutException("Did not disconnect"); - - } - } - } - } diff --git a/pom.xml b/pom.xml index 3bbaf6f..9fef2ac 100644 --- a/pom.xml +++ b/pom.xml @@ -79,7 +79,8 @@ 1.0 3.1.0 4.0.4 - 0.8.5 + 0.8.5 + 1.7.30 From e31194dc7ddeeca2bfcd5d452187b828a5c4c416 Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Wed, 1 Jul 2020 13:42:36 +0200 Subject: [PATCH 07/12] ensure binary compatibility with jdk8 --- pom.xml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pom.xml b/pom.xml index 9fef2ac..52ddf63 100644 --- a/pom.xml +++ b/pom.xml @@ -174,6 +174,7 @@ 3.8.1 ${project.build.sourceEncoding} + 8 @@ -232,6 +233,7 @@ 3.8.1 ${project.build.sourceEncoding} + 8 From 97e019a30944655e3667f16d5c00ab94a455a6e5 Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Wed, 1 Jul 2020 13:48:02 +0200 Subject: [PATCH 08/12] fix pom --- blazingcache-services/pom.xml | 31 ++++++++++++++++++------------- 1 file changed, 18 insertions(+), 13 deletions(-) diff --git a/blazingcache-services/pom.xml b/blazingcache-services/pom.xml index 48ffb26..680f70b 100644 --- a/blazingcache-services/pom.xml +++ b/blazingcache-services/pom.xml @@ -6,14 +6,14 @@ blazingcache 2.3.0-SNAPSHOT - blazingcache-services + blazingcache-services jar BlazingCache :: Services Apache License, Version 2.0 http://www.apache.org/licenses/LICENSE-2.0.txt - repo + repo @@ -35,12 +35,12 @@ copy-dependencies - + - + org.apache.maven.plugins maven-assembly-plugin @@ -48,14 +48,14 @@ false - src/main/assemble/zip-assembly.xml - + src/main/assemble/zip-assembly.xml + true blazingcache-${project.version} - make-assembly + make-assembly package single @@ -63,14 +63,14 @@ - + - + ${project.groupId} blazingcache-core - ${project.version} + ${project.version} org.apache.zookeeper @@ -114,13 +114,18 @@ junit junit - 4.12 + 4.12 test org.slf4j slf4j-jdk14 - 1.7.24 + ${libs.slf4j} - + + org.slf4j + slf4j-api + ${libs.slf4j} + + From 5e3d89723861c33afd5f6f6ceabe33f112cdb25b Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Wed, 1 Jul 2020 13:53:04 +0200 Subject: [PATCH 09/12] fix jcache pom --- blazingcache-jcache/pom.xml | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/blazingcache-jcache/pom.xml b/blazingcache-jcache/pom.xml index 5b87ff6..24252a0 100644 --- a/blazingcache-jcache/pom.xml +++ b/blazingcache-jcache/pom.xml @@ -42,7 +42,21 @@ org.apache.curator curator-test - 2.11.1 + ${libs.curator} + test + + + + io.dropwizard.metrics + metrics-core + ${libs.metrics} + test + + + + org.xerial.snappy + snappy-java + ${libs.snappy} test From bda9b5a15e46b242228fe6884fa0b7a06ef79358 Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Wed, 1 Jul 2020 14:35:53 +0200 Subject: [PATCH 10/12] Issue 148: 'Stuck client' may not be detected in case of non-open socket --- blazingcache-core/pom.xml | 6 ++ .../network/netty/NettyChannel.java | 3 +- .../server/CacheServerEndpoint.java | 3 +- .../server/CacheServerSideConnection.java | 6 +- .../client/StuckClientMachineTest.java | 95 +++++++++++++++++++ pom.xml | 5 +- 6 files changed, 113 insertions(+), 5 deletions(-) create mode 100644 blazingcache-core/src/test/java/blazingcache/client/StuckClientMachineTest.java diff --git a/blazingcache-core/pom.xml b/blazingcache-core/pom.xml index f72eaa5..d1e16e0 100644 --- a/blazingcache-core/pom.xml +++ b/blazingcache-core/pom.xml @@ -138,5 +138,11 @@ ${libs.slf4j} test + + org.powermock + powermock-reflect + ${libs.powermock} + test + diff --git a/blazingcache-core/src/main/java/blazingcache/network/netty/NettyChannel.java b/blazingcache-core/src/main/java/blazingcache/network/netty/NettyChannel.java index 3c36d91..e8fb700 100644 --- a/blazingcache-core/src/main/java/blazingcache/network/netty/NettyChannel.java +++ b/blazingcache-core/src/main/java/blazingcache/network/netty/NettyChannel.java @@ -46,7 +46,7 @@ public class NettyChannel extends Channel { private static final boolean DISCONNECT_ON_PENDING_REPLY_TIMEOUT = Boolean.parseBoolean(System.getProperty("blazingcache.nettychannel.disconnectonpendingreplytimeout", "true")); - volatile SocketChannel socket; + private volatile SocketChannel socket; private static final Logger LOGGER = Logger.getLogger(NettyChannel.class.getName()); private static final AtomicLong idGenerator = new AtomicLong(); @@ -209,6 +209,7 @@ public void messageSent(Message originalMessage, Throwable error) { @Override public boolean isValid() { SocketChannel _socket = socket; + System.out.println("isValid "+ioErrors+" "+this); return _socket != null && _socket.isOpen() && !ioErrors; } diff --git a/blazingcache-core/src/main/java/blazingcache/server/CacheServerEndpoint.java b/blazingcache-core/src/main/java/blazingcache/server/CacheServerEndpoint.java index 32eabc8..a16ea9d 100644 --- a/blazingcache-core/src/main/java/blazingcache/server/CacheServerEndpoint.java +++ b/blazingcache-core/src/main/java/blazingcache/server/CacheServerEndpoint.java @@ -90,8 +90,9 @@ void connectionClosed(CacheServerSideConnection con) { } void processIdleConnections() { - try { + try { List connections = new ArrayList<>(clientConnections.values()); + LOGGER.log(Level.INFO, "processIdleConnections " + connections); for (CacheServerSideConnection cs : connections) { cs.processIdleConnection(); } diff --git a/blazingcache-core/src/main/java/blazingcache/server/CacheServerSideConnection.java b/blazingcache-core/src/main/java/blazingcache/server/CacheServerSideConnection.java index dd9dd5a..af9b3c7 100644 --- a/blazingcache-core/src/main/java/blazingcache/server/CacheServerSideConnection.java +++ b/blazingcache-core/src/main/java/blazingcache/server/CacheServerSideConnection.java @@ -599,10 +599,14 @@ public void replyReceived(Message originalMessage, Message message, Throwable er }); } - void processIdleConnection() { + void processIdleConnection() { Channel _channel = channel; + if (_channel != null && _channel.isValid()) { + LOGGER.log(Level.INFO, "processIdleConnection DONE " + _channel); _channel.channelIdle(); + } else { + LOGGER.log(Level.INFO, "processIdleConnection SKIP " + _channel); } } diff --git a/blazingcache-core/src/test/java/blazingcache/client/StuckClientMachineTest.java b/blazingcache-core/src/test/java/blazingcache/client/StuckClientMachineTest.java new file mode 100644 index 0000000..0a9389e --- /dev/null +++ b/blazingcache-core/src/test/java/blazingcache/client/StuckClientMachineTest.java @@ -0,0 +1,95 @@ +/* + Licensed to Diennea S.r.l. under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. Diennea S.r.l. licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + + */ +package blazingcache.client; + +import blazingcache.network.ServerHostData; +import blazingcache.network.netty.NettyCacheServerLocator; +import blazingcache.server.CacheServer; +import java.nio.charset.StandardCharsets; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import org.junit.Test; +import static org.junit.Assert.assertTrue; +import blazingcache.network.Message; +import blazingcache.network.netty.NettyChannel; +import blazingcache.server.CacheServerSideConnection; +import io.netty.channel.socket.SocketChannel; +import org.powermock.reflect.Whitebox; + +/** + * Test for loadEntry + * + * @author enrico.olivelli + */ +public class StuckClientMachineTest { + + @Test + public void basicTest() throws Exception { + byte[] data = "testdata".getBytes(StandardCharsets.UTF_8); + + ServerHostData serverHostData = new ServerHostData("localhost", 1234, "test", false, null); + try (CacheServer cacheServer = new CacheServer("ciao", serverHostData)) { + + cacheServer.setSlowClientTimeout(10000); + cacheServer.start(); + + try (CacheClient client1 = new CacheClient("theClient1", "ciao", new NettyCacheServerLocator(serverHostData)); + CacheClient client2 = new CacheClient("theClient2", "ciao", new NettyCacheServerLocator(serverHostData)) { + @Override + public void messageReceived(Message message) { + System.out.println("messageReceived "+message); + // swallow every message + } + + }; + ) { + + client1.start(); + client2.start(); + + + assertTrue(client1.waitForConnection(10000)); + assertTrue(client2.waitForConnection(10000)); + CacheServerSideConnection serverSideConnectionPeer2 = cacheServer.getAcceptor().getClientConnections().get(client2.getClientId()); + + + client1.load("foo", data, 0); + assertNotNull(client2.fetch("foo")); + + System.out.println("QUIIiIIIIIIII"); + + + client1.invalidate("foo"); + + // we are sure that we are using the NettyChannel, not JVMChannel + NettyChannel channel = (NettyChannel) serverSideConnectionPeer2.getChannel(); + Whitebox.setInternalState(channel, "ioErrors", true); + + + assertNull(client1.get("foo")); + assertNull(client2.get("foo")); + + } + + } + + } + +} diff --git a/pom.xml b/pom.xml index 52ddf63..8eab8ae 100644 --- a/pom.xml +++ b/pom.xml @@ -81,6 +81,7 @@ 4.0.4 0.8.5 1.7.30 + 2.0.5 @@ -161,7 +162,7 @@ org.apache.maven.plugins maven-surefire-plugin - 3.0.0-M5 + 3.0.0-M3 @{argLine} -Xmx1024m -Djava.net.preferIpv4Stack=true 1 @@ -220,7 +221,7 @@ org.apache.maven.plugins maven-surefire-plugin - 3.0.0-M5 + 3.0.0-M3 -Xmx1024m -Djava.net.preferIpv4Stack=true 1 From a7edc34744fde2be211bd5f435b51e0d44320687 Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Wed, 1 Jul 2020 14:57:29 +0200 Subject: [PATCH 11/12] xx --- .../network/netty/NettyChannel.java | 6 +- .../server/CacheServerSideConnection.java | 6 +- .../client/StuckClientMachineTest.java | 80 +++++++++---------- pom.xml | 4 +- 4 files changed, 45 insertions(+), 51 deletions(-) diff --git a/blazingcache-core/src/main/java/blazingcache/network/netty/NettyChannel.java b/blazingcache-core/src/main/java/blazingcache/network/netty/NettyChannel.java index e8fb700..f1c3b50 100644 --- a/blazingcache-core/src/main/java/blazingcache/network/netty/NettyChannel.java +++ b/blazingcache-core/src/main/java/blazingcache/network/netty/NettyChannel.java @@ -55,7 +55,7 @@ public class NettyChannel extends Channel { private final Map pendingReplyMessagesDeadline = new ConcurrentHashMap<>(); private final ExecutorService callbackexecutor; private final NettyConnector connector; - private boolean ioErrors = false; + private volatile boolean ioErrors = false; private final long id = idGenerator.incrementAndGet(); private final boolean disconnectOnReplyTimeout; @@ -251,7 +251,7 @@ public void close() { } } - void exceptionCaught(Throwable cause) { + public void exceptionCaught(Throwable cause) { LOGGER.log(Level.SEVERE, this + " io-error " + cause, cause); ioErrors = true; } @@ -278,10 +278,12 @@ public void channelIdle() { processPendingReplyMessagesDeadline(); } + @Override public String getName() { return name; } + @Override public void setName(String name) { this.name = name; } diff --git a/blazingcache-core/src/main/java/blazingcache/server/CacheServerSideConnection.java b/blazingcache-core/src/main/java/blazingcache/server/CacheServerSideConnection.java index af9b3c7..a2b08fb 100644 --- a/blazingcache-core/src/main/java/blazingcache/server/CacheServerSideConnection.java +++ b/blazingcache-core/src/main/java/blazingcache/server/CacheServerSideConnection.java @@ -601,12 +601,8 @@ public void replyReceived(Message originalMessage, Message message, Throwable er void processIdleConnection() { Channel _channel = channel; - - if (_channel != null && _channel.isValid()) { - LOGGER.log(Level.INFO, "processIdleConnection DONE " + _channel); + if (_channel != null) { _channel.channelIdle(); - } else { - LOGGER.log(Level.INFO, "processIdleConnection SKIP " + _channel); } } diff --git a/blazingcache-core/src/test/java/blazingcache/client/StuckClientMachineTest.java b/blazingcache-core/src/test/java/blazingcache/client/StuckClientMachineTest.java index 0a9389e..594b2de 100644 --- a/blazingcache-core/src/test/java/blazingcache/client/StuckClientMachineTest.java +++ b/blazingcache-core/src/test/java/blazingcache/client/StuckClientMachineTest.java @@ -1,21 +1,21 @@ /* - Licensed to Diennea S.r.l. under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. Diennea S.r.l. licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. - + * Licensed to Diennea S.r.l. under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Diennea S.r.l. licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * */ package blazingcache.client; @@ -30,8 +30,7 @@ import blazingcache.network.Message; import blazingcache.network.netty.NettyChannel; import blazingcache.server.CacheServerSideConnection; -import io.netty.channel.socket.SocketChannel; -import org.powermock.reflect.Whitebox; +import java.util.concurrent.atomic.AtomicReference; /** * Test for loadEntry @@ -45,46 +44,43 @@ public void basicTest() throws Exception { byte[] data = "testdata".getBytes(StandardCharsets.UTF_8); ServerHostData serverHostData = new ServerHostData("localhost", 1234, "test", false, null); - try (CacheServer cacheServer = new CacheServer("ciao", serverHostData)) { - + try ( CacheServer cacheServer = new CacheServer("ciao", serverHostData)) { + cacheServer.setSlowClientTimeout(10000); cacheServer.start(); - - try (CacheClient client1 = new CacheClient("theClient1", "ciao", new NettyCacheServerLocator(serverHostData)); - CacheClient client2 = new CacheClient("theClient2", "ciao", new NettyCacheServerLocator(serverHostData)) { + AtomicReference _client2 = new AtomicReference<>(); + try ( CacheClient client1 = new CacheClient("theClient1", "ciao", new NettyCacheServerLocator(serverHostData)); CacheClient client2 = new CacheClient("theClient2", "ciao", + new NettyCacheServerLocator(serverHostData)) { @Override public void messageReceived(Message message) { - System.out.println("messageReceived "+message); // swallow every message + // client2 will not answer to the "invaliate" message + // client1 has to wait until the server declares client2 dead + + // simulate a network error, only on the server side part + CacheServerSideConnection serverSideConnectionPeer2 = cacheServer.getAcceptor().getClientConnections().get(_client2.get().getClientId()); + // we are sure that we are using the NettyChannel, not JVMChannel + NettyChannel channel = (NettyChannel) serverSideConnectionPeer2.getChannel(); + channel.exceptionCaught(new Exception("dummy unpredictable error")); } - - }; - ) { + };) { + _client2.set(client2); client1.start(); client2.start(); - assertTrue(client1.waitForConnection(10000)); assertTrue(client2.waitForConnection(10000)); - CacheServerSideConnection serverSideConnectionPeer2 = cacheServer.getAcceptor().getClientConnections().get(client2.getClientId()); - - + client1.load("foo", data, 0); assertNotNull(client2.fetch("foo")); - - System.out.println("QUIIiIIIIIIII"); - client1.invalidate("foo"); - - // we are sure that we are using the NettyChannel, not JVMChannel - NettyChannel channel = (NettyChannel) serverSideConnectionPeer2.getChannel(); - Whitebox.setInternalState(channel, "ioErrors", true); - - + assertNull(client1.get("foo")); - assertNull(client2.get("foo")); + + // client2 does not know that the server had problems and it still holds a copy of the value + assertNotNull(client2.get("foo")); } diff --git a/pom.xml b/pom.xml index 9147e37..8eab8ae 100644 --- a/pom.xml +++ b/pom.xml @@ -162,7 +162,7 @@ org.apache.maven.plugins maven-surefire-plugin - 3.0.0-M5 + 3.0.0-M3 @{argLine} -Xmx1024m -Djava.net.preferIpv4Stack=true 1 @@ -221,7 +221,7 @@ org.apache.maven.plugins maven-surefire-plugin - 3.0.0-M5 + 3.0.0-M3 -Xmx1024m -Djava.net.preferIpv4Stack=true 1 From 77e92e9c7ca29a6c669f97931d7730f20967645d Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Wed, 1 Jul 2020 15:02:06 +0200 Subject: [PATCH 12/12] clean up --- blazingcache-core/pom.xml | 18 ++--- .../network/netty/NettyChannel.java | 1 - .../server/CacheServerEndpoint.java | 1 - .../server/CacheServerSideConnection.java | 4 +- ...yStuckClientDueToServerSideErrorTest.java} | 9 +-- pom.xml | 73 +++++++++---------- 6 files changed, 46 insertions(+), 60 deletions(-) rename blazingcache-core/src/test/java/blazingcache/client/{StuckClientMachineTest.java => ApparentlyStuckClientDueToServerSideErrorTest.java} (96%) diff --git a/blazingcache-core/pom.xml b/blazingcache-core/pom.xml index d1e16e0..aeb668b 100644 --- a/blazingcache-core/pom.xml +++ b/blazingcache-core/pom.xml @@ -6,14 +6,14 @@ blazingcache 2.3.0-SNAPSHOT - blazingcache-core + blazingcache-core jar BlazingCache :: Core Apache License, Version 2.0 http://www.apache.org/licenses/LICENSE-2.0.txt - repo + repo @@ -42,7 +42,7 @@ io.netty netty-tcnative-boringssl-static ${libs.netty4.tcnative} - compile + compile javax.servlet @@ -50,7 +50,7 @@ ${libs.servletapi} provided - + org.codehaus.jackson jackson-mapper-asl ${libs.jackson.mapper} @@ -102,7 +102,7 @@ ${libs.snappy} test - + org.apache.hadoop hadoop-minikdc @@ -130,7 +130,7 @@ org.slf4j slf4j-api - ${libs.slf4j} + ${libs.slf4j} org.slf4j @@ -138,11 +138,5 @@ ${libs.slf4j} test - - org.powermock - powermock-reflect - ${libs.powermock} - test - diff --git a/blazingcache-core/src/main/java/blazingcache/network/netty/NettyChannel.java b/blazingcache-core/src/main/java/blazingcache/network/netty/NettyChannel.java index f1c3b50..4cb6cc7 100644 --- a/blazingcache-core/src/main/java/blazingcache/network/netty/NettyChannel.java +++ b/blazingcache-core/src/main/java/blazingcache/network/netty/NettyChannel.java @@ -209,7 +209,6 @@ public void messageSent(Message originalMessage, Throwable error) { @Override public boolean isValid() { SocketChannel _socket = socket; - System.out.println("isValid "+ioErrors+" "+this); return _socket != null && _socket.isOpen() && !ioErrors; } diff --git a/blazingcache-core/src/main/java/blazingcache/server/CacheServerEndpoint.java b/blazingcache-core/src/main/java/blazingcache/server/CacheServerEndpoint.java index a16ea9d..304e209 100644 --- a/blazingcache-core/src/main/java/blazingcache/server/CacheServerEndpoint.java +++ b/blazingcache-core/src/main/java/blazingcache/server/CacheServerEndpoint.java @@ -92,7 +92,6 @@ void connectionClosed(CacheServerSideConnection con) { void processIdleConnections() { try { List connections = new ArrayList<>(clientConnections.values()); - LOGGER.log(Level.INFO, "processIdleConnections " + connections); for (CacheServerSideConnection cs : connections) { cs.processIdleConnection(); } diff --git a/blazingcache-core/src/main/java/blazingcache/server/CacheServerSideConnection.java b/blazingcache-core/src/main/java/blazingcache/server/CacheServerSideConnection.java index a2b08fb..65596d1 100644 --- a/blazingcache-core/src/main/java/blazingcache/server/CacheServerSideConnection.java +++ b/blazingcache-core/src/main/java/blazingcache/server/CacheServerSideConnection.java @@ -502,7 +502,7 @@ public String toString() { void sendKeyInvalidationMessage(String sourceClientId, RawString key, BroadcastRequestStatus invalidation) { Channel _channel = channel; if (_channel == null || !_channel.isValid()) { - // not connected, quindi cache vuota + // not connected, quindi cache vuota LOGGER.log(Level.SEVERE, "client " + clientId + " without channel, considering key " + key + " invalidated"); invalidation.clientDone(clientId); return; @@ -599,7 +599,7 @@ public void replyReceived(Message originalMessage, Message message, Throwable er }); } - void processIdleConnection() { + void processIdleConnection() { Channel _channel = channel; if (_channel != null) { _channel.channelIdle(); diff --git a/blazingcache-core/src/test/java/blazingcache/client/StuckClientMachineTest.java b/blazingcache-core/src/test/java/blazingcache/client/ApparentlyStuckClientDueToServerSideErrorTest.java similarity index 96% rename from blazingcache-core/src/test/java/blazingcache/client/StuckClientMachineTest.java rename to blazingcache-core/src/test/java/blazingcache/client/ApparentlyStuckClientDueToServerSideErrorTest.java index 594b2de..88a4263 100644 --- a/blazingcache-core/src/test/java/blazingcache/client/StuckClientMachineTest.java +++ b/blazingcache-core/src/test/java/blazingcache/client/ApparentlyStuckClientDueToServerSideErrorTest.java @@ -32,15 +32,10 @@ import blazingcache.server.CacheServerSideConnection; import java.util.concurrent.atomic.AtomicReference; -/** - * Test for loadEntry - * - * @author enrico.olivelli - */ -public class StuckClientMachineTest { +public class ApparentlyStuckClientDueToServerSideErrorTest { @Test - public void basicTest() throws Exception { + public void test() throws Exception { byte[] data = "testdata".getBytes(StandardCharsets.UTF_8); ServerHostData serverHostData = new ServerHostData("localhost", 1234, "test", false, null); diff --git a/pom.xml b/pom.xml index 8eab8ae..b1f7865 100644 --- a/pom.xml +++ b/pom.xml @@ -12,13 +12,13 @@ Apache License, Version 2.0 http://www.apache.org/licenses/LICENSE-2.0.txt - repo + repo - https://github.com/diennea/blazingcache.git - scm:git:https://github.com/diennea/blazingcache.git - scm:git:https://github.com/diennea/blazingcache.git + https://github.com/diennea/blazingcache.git + scm:git:https://github.com/diennea/blazingcache.git + scm:git:https://github.com/diennea/blazingcache.git HEAD @@ -59,12 +59,12 @@ - https://github.com/diennea/blazingcache - + https://github.com/diennea/blazingcache + UTF-8 1.8 - 1.8 + 1.8 4.1.50.Final 2.0.31.Final 3.1.0 @@ -76,12 +76,11 @@ 2.7 3.2.1 5.0.0 - 1.0 + 1.0 3.1.0 - 4.0.4 + 4.0.4 0.8.5 - 1.7.30 - 2.0.5 + 1.7.30 @@ -116,35 +115,35 @@ org.jacoco jacoco-maven-plugin ${libs.jacoco} - + default - true - + true + ./blazingcache-core ./blazingcache-jcache ./blazingcache-services ./blazingcache-website ./blazingcache-site-skin - - - + + + dev.majordodo.org BlazingCache Public Repository https://dev.majordodo.org/nexus/content/repositories/releases/ - - + + dev.majordodo.org BlazingCache Public Repository https://dev.majordodo.org/nexus/content/repositories/snapshots/ - - - + + + org.jacoco @@ -152,19 +151,19 @@ ${libs.jacoco} - default-prepare-agent + default-prepare-agent prepare-agent - + org.apache.maven.plugins maven-surefire-plugin 3.0.0-M3 - - @{argLine} -Xmx1024m -Djava.net.preferIpv4Stack=true + + @{argLine} -Xmx1024m -Djava.net.preferIpv4Stack=true 1 false @@ -198,18 +197,18 @@ - - + + ossrh - false - + false + ./blazingcache-core - ./blazingcache-jcache - + ./blazingcache-jcache + ossrh @@ -222,7 +221,7 @@ org.apache.maven.plugins maven-surefire-plugin 3.0.0-M3 - + -Xmx1024m -Djava.net.preferIpv4Stack=true 1 false @@ -255,7 +254,7 @@ https://oss.sonatype.org/ false - + org.apache.maven.plugins maven-source-plugin @@ -298,14 +297,14 @@ - + - website + website ./blazingcache-site-skin - ./blazingcache-website + ./blazingcache-website