Skip to content

Commit

Permalink
Merge 789fc81 into 5a70bfa
Browse files Browse the repository at this point in the history
  • Loading branch information
eolivelli committed Feb 10, 2019
2 parents 5a70bfa + 789fc81 commit b3322e7
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -663,10 +663,10 @@ public void messageReceived(Message message) {
}

ByteBuf buffer = cacheByteArray(data);
CacheEntry cacheEntry = new CacheEntry(key, System.nanoTime(), buffer, expiretime, null);
CacheEntry entry = new CacheEntry(key, System.nanoTime(), buffer, expiretime, null);

storeEntry(entry);

storeEntry(cacheEntry);

Channel _channel = channel;
if (_channel != null) {
_channel.sendReplyMessage(message, Message.ACK(clientId));
Expand Down Expand Up @@ -845,6 +845,8 @@ public CacheEntry fetch(String key, KeyLock lock) throws InterruptedException {
ByteBuf buffer = cacheByteArray(data);
CacheEntry newEntry = new CacheEntry(_key, System.nanoTime(), buffer, expiretime, null);
storeEntry(newEntry);
// client will be responsible of releasing the entry
newEntry.retain();
this.clientMissedGetsToSuccessfulFetches.incrementAndGet();
this.clientHits.incrementAndGet();
return newEntry;
Expand All @@ -860,6 +862,11 @@ public CacheEntry fetch(String key, KeyLock lock) throws InterruptedException {
return null;
}

/**
* Stores the entry in the map.
*
* @param entry
*/
private void storeEntry(CacheEntry entry) {
cache.compute(entry.getKey(), (k, prev) -> {
if (prev != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,11 @@ public void retain() {
}

/**
* Releases the internal buffer
* Releases the entry
*/
@Override
public void close() {
try {
this.buf.release();
} catch (RuntimeException err) {
LOG.log(Level.SEVERE, "Error while releasing entry", err);
}
this.buf.release();
}

synchronized Object resolveReference(EntrySerializer serializer) throws CacheException {
Expand Down
104 changes: 104 additions & 0 deletions blazingcache-core/src/test/java/blazingcache/client/HammerTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
Licensed to Diennea S.r.l. under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. Diennea S.r.l. licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/
package blazingcache.client;

import blazingcache.network.ServerHostData;
import blazingcache.network.netty.NettyCacheServerLocator;
import blazingcache.server.CacheServer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import org.junit.Test;

/**
* Hammer test, index to excercise concurrency managment of refcounts
*
* @author enrico.olivelli
*/
public class HammerTest {

@Test
public void basicTest() throws Exception {
final byte[] data = "testdata".getBytes(StandardCharsets.UTF_8);
ServerHostData serverHostData = new ServerHostData("localhost", 1234, "test", false, null);
try (CacheServer cacheServer = new CacheServer("ciao", serverHostData)) {
cacheServer.start();
try (CacheClient client1 = new CacheClient("theClient1", "ciao", new NettyCacheServerLocator(serverHostData));
CacheClient client2 = new CacheClient("theClient2", "ciao", new NettyCacheServerLocator(serverHostData));) {
client1.start();
client2.start();

assertTrue(client1.waitForConnection(10000));
for (int i = 0; i < 100; i++) {
String _key = "key" + i;
byte[] expectedData = _key.getBytes(StandardCharsets.UTF_8);
client1.load(_key, expectedData, 0);
}
assertTrue(client2.waitForConnection(10000));

List<Future> futures = new ArrayList<>();
ExecutorService threads = Executors.newFixedThreadPool(4);
try {
for (int i = 0; i < 100; i++) {
futures.add(threads.submit(() -> {
try {
int j = ThreadLocalRandom.current().nextInt(100);
String _key = "key" + j;
byte[] expectedData = _key.getBytes(StandardCharsets.UTF_8);

try (CacheEntry entry = client2.fetch(_key);) {
assertNotNull("key " + _key + " not found?", entry);
assertArrayEquals(entry.getSerializedData(), expectedData);
}
// rewrite again,from client1
client1.put(_key, expectedData, 0);
} catch (Exception err) {
err.printStackTrace();
// fail the future
throw new RuntimeException(err);
}
}));
}
futures.forEach(a -> {
try {
a.get();
} catch (Exception ex) {
throw new RuntimeException(ex);
}
});
} finally {
threads.shutdownNow();
threads.awaitTermination(10, TimeUnit.SECONDS);
}

}

}

}
}

0 comments on commit b3322e7

Please sign in to comment.