Skip to content

Commit

Permalink
Merge 6b1b0cd into 08fdb86
Browse files Browse the repository at this point in the history
  • Loading branch information
eolivelli committed Jan 2, 2019
2 parents 08fdb86 + 6b1b0cd commit ce26c74
Show file tree
Hide file tree
Showing 10 changed files with 353 additions and 190 deletions.
278 changes: 178 additions & 100 deletions blazingcache-core/src/main/java/blazingcache/client/CacheClient.java

Large diffs are not rendered by default.

76 changes: 65 additions & 11 deletions blazingcache-core/src/main/java/blazingcache/client/CacheEntry.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,16 @@

import blazingcache.utils.RawString;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import java.io.IOException;
import java.io.InputStream;
import java.lang.ref.SoftReference;
import java.util.Arrays;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
* Una entry nella cache
Expand All @@ -34,36 +43,58 @@ public final class CacheEntry {
private long lastGetTime;
private final long putTime;
private final RawString key;
private final byte[] serializedData;
private final ByteBuf buf;
private final long expiretime;
private final int dataLength;
private SoftReference<Object> reference;

public CacheEntry(RawString key, long lastGetTimeNanos, byte[] serializedData, long expiretime, Object deserialized) {
/**
* Creates the entry and refcount of the given ByteBuf is not incremented
*
* @param key
* @param lastGetTimeNanos
* @param serializedData
* @param expiretime
* @param deserialized
*/
CacheEntry(RawString key, long lastGetTimeNanos, ByteBuf serializedData, long expiretime, Object deserialized) {
this.key = key;
this.lastGetTime = lastGetTimeNanos;
this.putTime = lastGetTimeNanos;
this.serializedData = serializedData;
this.buf = serializedData;
this.expiretime = expiretime;
this.dataLength = serializedData.readableBytes();
this.reference = deserialized != null ? new SoftReference<>(deserialized) : null;
}

public CacheEntry(RawString key, long lastGetTimeNanos, byte[] serializedData, long expiretime) {
this.key = key;
this.lastGetTime = lastGetTimeNanos;
this.putTime = lastGetTimeNanos;
this.serializedData = serializedData;
this.expiretime = expiretime;
/**
* Releases the internal buffer
*/
void release() {
try {
this.buf.release();
} catch (RuntimeException err) {
LOG.log(Level.SEVERE, "Error while releasing entry", err);
}
}

synchronized Object resolveReference(EntrySerializer serializer) throws CacheException {
Object resolved = reference != null ? reference.get() : null;
if (resolved == null) {
resolved = serializer.deserializeObject(key.toString(), serializedData);
try (InputStream in = getSerializedDataStream()) {
resolved = serializer.deserializeObject(key.toString(), in);
} catch (IOException err) {
throw new CacheException(err);
}
reference = new SoftReference<>(resolved);
}
return resolved;
}

public int getSerializedDataLength() {
return dataLength;
}

public RawString getKey() {
return key;
}
Expand All @@ -80,9 +111,27 @@ public void setLastGetTime(final long lastGetTimeNanos) {
this.lastGetTime = lastGetTimeNanos;
}

/**
* Access to data. You have to close the stream in order to handle correctly
* refcounts
*
* @return
*/
public InputStream getSerializedDataStream() {
return new ByteBufInputStream(buf.retainedSlice(),
buf.readableBytes(), true /* releaseOnClose */);
}

public boolean isSerializedDataEqualTo(byte[] other) {
// let Netty do the best not to copy memory
return buf.equals(Unpooled.wrappedBuffer(other));
}

public byte[] getSerializedData() {
return serializedData;
// copy data from Direct Memory to Heap
return ByteBufUtil.getBytes(buf);
}
private static final Logger LOG = Logger.getLogger(CacheEntry.class.getName());

public long getExpiretime() {
return expiretime;
Expand All @@ -93,4 +142,9 @@ public String toString() {
return "CacheEntry{" + "key=" + key + ", lastGetTime=" + lastGetTime + ", expiretime=" + expiretime + '}';
}

// only for tests
void discardInternalCachedObject() {
reference = null;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
*/
package blazingcache.client;

import java.io.InputStream;

/**
* An EntrySerializer is used to marshal/unmarshal objects to/from the byte[]
* representation, which is the effective 'cached' value
Expand All @@ -29,5 +31,5 @@ public interface EntrySerializer {

public byte[] serializeObject(String key, Object object) throws CacheException;

public Object deserializeObject(String key, byte[] value) throws CacheException;
public Object deserializeObject(String key, InputStream value) throws CacheException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@

import blazingcache.client.CacheException;
import blazingcache.client.EntrySerializer;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;

Expand All @@ -48,10 +48,9 @@ public byte[] serializeObject(String key, Object object) throws CacheException {
}

@Override
public Object deserializeObject(String key, byte[] value) throws CacheException {
try {
ByteArrayInputStream oo = new ByteArrayInputStream(value);
ObjectInputStream oo2 = new ObjectInputStream(oo);
public Object deserializeObject(String key, InputStream value) throws CacheException {
try {
ObjectInputStream oo2 = new ObjectInputStream(value);
return oo2.readUnshared();
} catch (IOException | SecurityException | ClassNotFoundException err) {
throw new CacheException(err);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,15 @@
import blazingcache.network.ServerHostData;
import blazingcache.network.netty.NettyCacheServerLocator;
import blazingcache.server.CacheServer;
import io.netty.util.IllegalReferenceCountException;
import java.io.IOException;
import java.io.ObjectStreamException;
import java.io.Serializable;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Assert;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import org.junit.Test;

/**
Expand Down Expand Up @@ -76,28 +79,51 @@ public void basicTest() throws Exception {
assertEquals(0, countObjectReads.get());

client2.putObject(key, myObject1, -1);

assertEquals(2, countObjectWrites.get());
assertEquals(0, countObjectReads.get());
assertEquals(0, countObjectReads.get());
MyBean reference_to_object1_changed = client1.fetchObject(key);
System.out.println("qui "+reference_to_object1_changed);
System.out.println("qui " + reference_to_object1_changed);
Assert.assertNotSame(reference_to_object1_changed, myObject1);

assertEquals(2, countObjectWrites.get());
assertEquals(1, countObjectReads.get());

System.out.println("qua");
MyBean reference_to_object1_changed_2 = client1.fetchObject(key);
Assert.assertSame(reference_to_object1_changed, reference_to_object1_changed_2);

assertEquals(2, countObjectWrites.get());
assertEquals(1, countObjectReads.get());
assertEquals(1, countObjectReads.get());

MyBean reference_to_object1_changed_3 = client1.fetchObject(key);
Assert.assertSame(reference_to_object1_changed, reference_to_object1_changed_3);


assertEquals(2, countObjectWrites.get());
assertEquals(1, countObjectReads.get());

CacheEntry entry = client1.get(key);
entry.discardInternalCachedObject();

MyBean reference_to_object1_changed_3b = client1.fetchObject(key);
Assert.assertNotSame(reference_to_object1_changed, reference_to_object1_changed_3b);

assertEquals(2, countObjectWrites.get());
assertEquals(1, countObjectReads.get());
assertEquals(2, countObjectReads.get());

// disconnection clears internal cache and releases all of the references
// to direct memory
client1.disconnect();
assertTrue(client1.waitForConnection(10000));

assertEquals(0, client1.getCacheSize());
// old reference to cache entry is not valid any more
// we can expect a Netty error
try {
entry.getSerializedData();
fail("should not work, refcount is now 0");
} catch (IllegalReferenceCountException expected) {
}

}

Expand Down
Loading

0 comments on commit ce26c74

Please sign in to comment.