Skip to content

Commit

Permalink
Merge pull request #155 from eolivelli/fix/fix-refcount-error
Browse files Browse the repository at this point in the history
Fix refcount for fetch() in case of entry coming from the network
  • Loading branch information
diegosalvi authored Feb 11, 2019
2 parents 5a70bfa + d6c4552 commit 3d568f1
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -663,10 +663,10 @@ public void messageReceived(Message message) {
}

ByteBuf buffer = cacheByteArray(data);
CacheEntry cacheEntry = new CacheEntry(key, System.nanoTime(), buffer, expiretime, null);
CacheEntry entry = new CacheEntry(key, System.nanoTime(), buffer, expiretime, null);

storeEntry(entry);

storeEntry(cacheEntry);

Channel _channel = channel;
if (_channel != null) {
_channel.sendReplyMessage(message, Message.ACK(clientId));
Expand Down Expand Up @@ -845,6 +845,8 @@ public CacheEntry fetch(String key, KeyLock lock) throws InterruptedException {
ByteBuf buffer = cacheByteArray(data);
CacheEntry newEntry = new CacheEntry(_key, System.nanoTime(), buffer, expiretime, null);
storeEntry(newEntry);
// client will be responsible of releasing the entry
newEntry.retain();
this.clientMissedGetsToSuccessfulFetches.incrementAndGet();
this.clientHits.incrementAndGet();
return newEntry;
Expand All @@ -860,6 +862,11 @@ public CacheEntry fetch(String key, KeyLock lock) throws InterruptedException {
return null;
}

/**
* Stores the entry in the map.
*
* @param entry
*/
private void storeEntry(CacheEntry entry) {
cache.compute(entry.getKey(), (k, prev) -> {
if (prev != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,11 @@ public void retain() {
}

/**
* Releases the internal buffer
* Releases the entry
*/
@Override
public void close() {
try {
this.buf.release();
} catch (RuntimeException err) {
LOG.log(Level.SEVERE, "Error while releasing entry", err);
}
this.buf.release();
}

synchronized Object resolveReference(EntrySerializer serializer) throws CacheException {
Expand Down
22 changes: 14 additions & 8 deletions blazingcache-core/src/test/java/blazingcache/FetchTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,11 @@
import blazingcache.network.ServerHostData;
import blazingcache.network.netty.NettyCacheServerLocator;
import blazingcache.server.CacheServer;
import java.util.logging.ConsoleHandler;
import java.util.logging.Level;
import java.util.logging.SimpleFormatter;
import org.junit.Assert;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import org.junit.Before;
import org.junit.Test;

/**
Expand Down Expand Up @@ -45,7 +43,6 @@ public class FetchTest {
// java.util.logging.Logger.getLogger("").setLevel(level);
// java.util.logging.Logger.getLogger("").addHandler(ch);
// }

@Test
public void basicTest() throws Exception {
byte[] data = "testdata".getBytes(StandardCharsets.UTF_8);
Expand All @@ -62,11 +59,20 @@ public void basicTest() throws Exception {

client1.put("pippo", data, 0);

CacheEntry remoteLoad = client2.fetch("pippo");
assertTrue(remoteLoad != null);
CacheEntry _remoteLoaded;
try (CacheEntry remoteLoad = client2.fetch("pippo");) {
assertNotNull(remoteLoad);
Assert.assertArrayEquals(data, remoteLoad.getSerializedData());
_remoteLoaded = remoteLoad;
}

// same fetch, hits local cache
try (CacheEntry remoteLoad = client2.fetch("pippo");) {
assertSame(remoteLoad, _remoteLoaded);
Assert.assertArrayEquals(data, remoteLoad.getSerializedData());
}

Assert.assertArrayEquals(data, client1.get("pippo").getSerializedData());
Assert.assertArrayEquals(data, remoteLoad.getSerializedData());
Assert.assertArrayEquals(data, client2.get("pippo").getSerializedData());

client1.invalidate("pippo");
Expand Down
104 changes: 104 additions & 0 deletions blazingcache-core/src/test/java/blazingcache/client/HammerTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
Licensed to Diennea S.r.l. under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. Diennea S.r.l. licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/
package blazingcache.client;

import blazingcache.network.ServerHostData;
import blazingcache.network.netty.NettyCacheServerLocator;
import blazingcache.server.CacheServer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import org.junit.Test;

/**
* Hammer test, index to excercise concurrency managment of refcounts
*
* @author enrico.olivelli
*/
public class HammerTest {

@Test
public void basicTest() throws Exception {
final byte[] data = "testdata".getBytes(StandardCharsets.UTF_8);
ServerHostData serverHostData = new ServerHostData("localhost", 1234, "test", false, null);
try (CacheServer cacheServer = new CacheServer("ciao", serverHostData)) {
cacheServer.start();
try (CacheClient client1 = new CacheClient("theClient1", "ciao", new NettyCacheServerLocator(serverHostData));
CacheClient client2 = new CacheClient("theClient2", "ciao", new NettyCacheServerLocator(serverHostData));) {
client1.start();
client2.start();

assertTrue(client1.waitForConnection(10000));
for (int i = 0; i < 100; i++) {
String _key = "key" + i;
byte[] expectedData = _key.getBytes(StandardCharsets.UTF_8);
client1.load(_key, expectedData, 0);
}
assertTrue(client2.waitForConnection(10000));

List<Future> futures = new ArrayList<>();
ExecutorService threads = Executors.newFixedThreadPool(2);
try {
for (int i = 0; i < 20; i++) {
futures.add(threads.submit(() -> {
try {
int j = ThreadLocalRandom.current().nextInt(100);
String _key = "key" + j;
byte[] expectedData = _key.getBytes(StandardCharsets.UTF_8);

try (CacheEntry entry = client2.fetch(_key);) {
assertNotNull("key " + _key + " not found?", entry);
assertArrayEquals(entry.getSerializedData(), expectedData);
}
// rewrite again,from client1
client1.put(_key, expectedData, 0);
} catch (Exception err) {
err.printStackTrace();
// fail the future
throw new RuntimeException(err);
}
}));
}
futures.forEach(a -> {
try {
a.get();
} catch (Exception ex) {
throw new RuntimeException(ex);
}
});
} finally {
threads.shutdownNow();
threads.awaitTermination(10, TimeUnit.SECONDS);
}

}

}

}
}

0 comments on commit 3d568f1

Please sign in to comment.