Skip to content

Commit

Permalink
Use clusterTime instead of system time (#28)
Browse files Browse the repository at this point in the history
(ported from #29)
  • Loading branch information
emre-aydin committed Apr 10, 2017
1 parent b57a0d5 commit 336804f
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ private String nextMarkerId() {
return Long.toString(markerIdCounter.getAndIncrement());
}

private long nextTimestamp() {
protected long nextTimestamp() {
return hazelcastInstance == null ? Clock.currentTimeMillis()
: HazelcastTimestamper.nextTimestamp(hazelcastInstance);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.hazelcast.hibernate.RegionCache;
import com.hazelcast.hibernate.serialization.Expirable;
import com.hazelcast.hibernate.serialization.Value;
import com.hazelcast.util.Clock;

/**
* A timestamp based local RegionCache
Expand All @@ -33,7 +32,10 @@ public TimestampsRegionCache(final String name, final HazelcastInstance hazelcas

@Override
public boolean put(final Object key, final Object value, final long txTimestamp, final Object version) {
boolean succeed = super.put(key, value, (Long) value, version);
// use the value in txTimestamp as the timestamp instead of the value, since
// hibernate pre-invalidates with a large value, and then invalidates with
//the actual time, which can cause queries to not be cached.
boolean succeed = super.put(key, value, txTimestamp, version);
if (succeed) {
maybeNotifyTopic(key, value, version);
}
Expand All @@ -45,21 +47,19 @@ protected void maybeInvalidate(final Object messageObject) {
final Timestamp ts = (Timestamp) messageObject;
final Object key = ts.getKey();

for (;;) {
for (; ; ) {
final Expirable value = cache.get(key);
final Long current = value != null ? (Long) value.getValue() : null;
if (current != null) {
if (ts.getTimestamp() > current) {
if (cache.replace(key, value, new Value(value.getVersion(),
Clock.currentTimeMillis(), ts.getTimestamp()))) {
if (cache.replace(key, value, new Value(value.getVersion(), nextTimestamp(), ts.getTimestamp()))) {
return;
}
} else {
return;
}
} else {
if (cache.putIfAbsent(key, new Value(null, Clock.currentTimeMillis(),
ts.getTimestamp())) == null) {
if (cache.putIfAbsent(key, new Value(null, nextTimestamp(), ts.getTimestamp())) == null) {
return;
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package com.hazelcast.hibernate.local;

import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.when;

import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;

import com.hazelcast.config.Config;
import com.hazelcast.config.MapConfig;
import com.hazelcast.core.Cluster;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.ITopic;
import com.hazelcast.core.Member;
import com.hazelcast.core.Message;
import com.hazelcast.core.MessageListener;

@RunWith(MockitoJUnitRunner.class)
public class TimestampsRegionCacheTest {

private static final String CACHE_NAME = "cache";

@Mock private Config config;
@Mock private MapConfig mapConfig;
@Mock private ITopic<Object> topic;
@Mock private HazelcastInstance instance;
@Mock private Cluster cluster;
@Mock private Member member;

private TimestampsRegionCache target;
private MessageListener<Object> listener;

@SuppressWarnings({ "rawtypes", "unchecked" })
@Before
public void setup() {
when(config.findMapConfig(eq(CACHE_NAME))).thenReturn(mapConfig);
when(instance.getCluster()).thenReturn(cluster);
when(instance.getConfig()).thenReturn(config);
when(instance.getTopic(eq(CACHE_NAME))).thenReturn(topic);

// make the message appear that it is coming from a different member of the cluster
when(member.localMember()).thenReturn(false);

ArgumentCaptor<MessageListener> listener = ArgumentCaptor.forClass(MessageListener.class);
when(topic.addMessageListener(listener.capture())).thenReturn("ignored");
target = new TimestampsRegionCache(CACHE_NAME, instance);
this.listener = listener.getValue();
}

@Test
public void shouldUseClusterTimestampFromInvalidationmessageInsteadOfSystemTime() {
long firstTimestamp = 1;
long secondTimestamp = 2;
long publishTime = 3;
long clusterTime = 4;

when(cluster.getClusterTime()).thenReturn(firstTimestamp, secondTimestamp);

// cache is primed by call, that uses clusterTime instead of system clock for timestamp
assertThat(target.put("QuerySpace", firstTimestamp, firstTimestamp, null), is(true));

assertThat("primed value should be in the cache", (Long)target.get("QuerySpace", firstTimestamp), is(firstTimestamp));

// a message is generated on a different cluster member informing us to update the timestamp region cache
Message<Object> message = new Message<Object>("topicName", new Timestamp("QuerySpace", secondTimestamp), publishTime, member);

// process the timestamp region update
listener.onMessage(message);

// this fails if we use system time instead of cluster time, causing the value to stay invisible until clustertime == system time (which it often isn't)
assertThat("key should be visible and have value specified in timestamp message, with current cluster time.", (Long)target.get("QuerySpace", clusterTime), is(secondTimestamp));
}
}

0 comments on commit 336804f

Please sign in to comment.