From 0f21d456ecc464ed426c5a4262ae0cbf9e241b18 Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Sun, 10 Feb 2019 09:42:15 +0100 Subject: [PATCH 1/2] Fix refcount for fetch() in case of entry coming from the network --- .../java/blazingcache/client/CacheClient.java | 13 ++- .../java/blazingcache/client/CacheEntry.java | 10 +- .../java/blazingcache/client/HammerTest.java | 104 ++++++++++++++++++ 3 files changed, 119 insertions(+), 8 deletions(-) create mode 100644 blazingcache-core/src/test/java/blazingcache/client/HammerTest.java diff --git a/blazingcache-core/src/main/java/blazingcache/client/CacheClient.java b/blazingcache-core/src/main/java/blazingcache/client/CacheClient.java index 2924246..11670cb 100644 --- a/blazingcache-core/src/main/java/blazingcache/client/CacheClient.java +++ b/blazingcache-core/src/main/java/blazingcache/client/CacheClient.java @@ -663,10 +663,10 @@ public void messageReceived(Message message) { } ByteBuf buffer = cacheByteArray(data); - CacheEntry cacheEntry = new CacheEntry(key, System.nanoTime(), buffer, expiretime, null); + CacheEntry entry = new CacheEntry(key, System.nanoTime(), buffer, expiretime, null); + + storeEntry(entry); - storeEntry(cacheEntry); - Channel _channel = channel; if (_channel != null) { _channel.sendReplyMessage(message, Message.ACK(clientId)); @@ -845,6 +845,8 @@ public CacheEntry fetch(String key, KeyLock lock) throws InterruptedException { ByteBuf buffer = cacheByteArray(data); CacheEntry newEntry = new CacheEntry(_key, System.nanoTime(), buffer, expiretime, null); storeEntry(newEntry); + // client will be responsible of releasing the entry + newEntry.retain(); this.clientMissedGetsToSuccessfulFetches.incrementAndGet(); this.clientHits.incrementAndGet(); return newEntry; @@ -860,6 +862,11 @@ public CacheEntry fetch(String key, KeyLock lock) throws InterruptedException { return null; } + /** + * Stores the entry in the map. + * + * @param entry + */ private void storeEntry(CacheEntry entry) { cache.compute(entry.getKey(), (k, prev) -> { if (prev != null) { diff --git a/blazingcache-core/src/main/java/blazingcache/client/CacheEntry.java b/blazingcache-core/src/main/java/blazingcache/client/CacheEntry.java index 8727c0d..66347f1 100644 --- a/blazingcache-core/src/main/java/blazingcache/client/CacheEntry.java +++ b/blazingcache-core/src/main/java/blazingcache/client/CacheEntry.java @@ -74,16 +74,16 @@ public void retain() { this.buf.retain(); } + void release() { + this.buf.release(); + } + /** * Releases the internal buffer */ @Override public void close() { - try { - this.buf.release(); - } catch (RuntimeException err) { - LOG.log(Level.SEVERE, "Error while releasing entry", err); - } + release(); } synchronized Object resolveReference(EntrySerializer serializer) throws CacheException { diff --git a/blazingcache-core/src/test/java/blazingcache/client/HammerTest.java b/blazingcache-core/src/test/java/blazingcache/client/HammerTest.java new file mode 100644 index 0000000..b7ca024 --- /dev/null +++ b/blazingcache-core/src/test/java/blazingcache/client/HammerTest.java @@ -0,0 +1,104 @@ +/* + Licensed to Diennea S.r.l. under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. Diennea S.r.l. licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + + */ +package blazingcache.client; + +import blazingcache.network.ServerHostData; +import blazingcache.network.netty.NettyCacheServerLocator; +import blazingcache.server.CacheServer; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import org.junit.Test; + +/** + * Hammer test, index to excercise concurrency managment of refcounts + * + * @author enrico.olivelli + */ +public class HammerTest { + + @Test + public void basicTest() throws Exception { + final byte[] data = "testdata".getBytes(StandardCharsets.UTF_8); + ServerHostData serverHostData = new ServerHostData("localhost", 1234, "test", false, null); + try (CacheServer cacheServer = new CacheServer("ciao", serverHostData)) { + cacheServer.start(); + try (CacheClient client1 = new CacheClient("theClient1", "ciao", new NettyCacheServerLocator(serverHostData)); + CacheClient client2 = new CacheClient("theClient2", "ciao", new NettyCacheServerLocator(serverHostData));) { + client1.start(); + client2.start(); + + assertTrue(client1.waitForConnection(10000)); + for (int i = 0; i < 100; i++) { + String _key = "key" + i; + byte[] expectedData = _key.getBytes(StandardCharsets.UTF_8); + client1.load(_key, expectedData, 0); + } + assertTrue(client2.waitForConnection(10000)); + + List futures = new ArrayList<>(); + ExecutorService threads = Executors.newFixedThreadPool(4); + try { + for (int i = 0; i < 100; i++) { + futures.add(threads.submit(() -> { + try { + int j = ThreadLocalRandom.current().nextInt(100); + String _key = "key" + j; + byte[] expectedData = _key.getBytes(StandardCharsets.UTF_8); + + try (CacheEntry entry = client2.fetch(_key);) { + assertNotNull("key " + _key + " not found?", entry); + assertArrayEquals(entry.getSerializedData(), expectedData); + } + // rewrite again,from client1 + client1.put(_key, expectedData, 0); + } catch (Exception err) { + err.printStackTrace(); + // fail the future + throw new RuntimeException(err); + } + })); + } + futures.forEach(a -> { + try { + a.get(); + } catch (Exception ex) { + throw new RuntimeException(ex); + } + }); + } finally { + threads.shutdownNow(); + threads.awaitTermination(10, TimeUnit.SECONDS); + } + + } + + } + + } +} From 789fc8164319a23c648d336951762d1e42fff51c Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Sun, 10 Feb 2019 09:46:31 +0100 Subject: [PATCH 2/2] Drop unused method --- .../src/main/java/blazingcache/client/CacheEntry.java | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/blazingcache-core/src/main/java/blazingcache/client/CacheEntry.java b/blazingcache-core/src/main/java/blazingcache/client/CacheEntry.java index 66347f1..256addc 100644 --- a/blazingcache-core/src/main/java/blazingcache/client/CacheEntry.java +++ b/blazingcache-core/src/main/java/blazingcache/client/CacheEntry.java @@ -74,16 +74,12 @@ public void retain() { this.buf.retain(); } - void release() { - this.buf.release(); - } - /** - * Releases the internal buffer + * Releases the entry */ @Override public void close() { - release(); + this.buf.release(); } synchronized Object resolveReference(EntrySerializer serializer) throws CacheException {