Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mapCleared is added to EntryListener #2790

Merged
merged 11 commits into from Jun 24, 2014
Expand Up @@ -34,12 +34,12 @@
import com.hazelcast.core.ITopic;
import com.hazelcast.core.ItemEvent;
import com.hazelcast.core.ItemListener;
import com.hazelcast.core.MapEvent;
import com.hazelcast.core.Member;
import com.hazelcast.core.Message;
import com.hazelcast.core.MessageListener;
import com.hazelcast.core.MultiMap;
import com.hazelcast.core.Partition;
import com.hazelcast.core.MapEvent;
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
import com.hazelcast.nio.serialization.DataSerializable;
Expand Down Expand Up @@ -1391,6 +1391,11 @@ public void mapEvicted(MapEvent event) {
println(event);
}

@Override
public void mapCleared(MapEvent event) {
println(event);
}

@Override
public void itemAdded(ItemEvent itemEvent) {
println("Item added = " + itemEvent.getItem());
Expand Down
Expand Up @@ -30,6 +30,7 @@
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.ICompletableFuture;
import com.hazelcast.core.IMap;
import com.hazelcast.core.MapEvent;
import com.hazelcast.core.Member;
import com.hazelcast.map.EntryProcessor;
import com.hazelcast.map.MapEntrySet;
Expand Down Expand Up @@ -980,34 +981,47 @@ protected long getTimeInMillis(final long time, final TimeUnit timeunit) {
private EventHandler<PortableEntryEvent> createHandler(final EntryListener<K, V> listener, final boolean includeValue) {
return new EventHandler<PortableEntryEvent>() {
public void handle(PortableEntryEvent event) {
V value = null;
V oldValue = null;
if (includeValue) {
value = toObject(event.getValue());
oldValue = toObject(event.getOldValue());
}
K key = toObject(event.getKey());
Member member = getContext().getClusterService().getMember(event.getUuid());
EntryEvent<K, V> entryEvent = new EntryEvent<K, V>(name, member,
event.getEventType().getType(), key, oldValue, value);
switch (event.getEventType()) {
case ADDED:
listener.entryAdded(entryEvent);
listener.entryAdded(createEntryEvent(event, member));
break;
case REMOVED:
listener.entryRemoved(entryEvent);
listener.entryRemoved(createEntryEvent(event, member));
break;
case UPDATED:
listener.entryUpdated(entryEvent);
listener.entryUpdated(createEntryEvent(event, member));
break;
case EVICTED:
listener.entryEvicted(entryEvent);
listener.entryEvicted(createEntryEvent(event, member));
break;
case EVICT_ALL:
listener.mapEvicted(createMapEvent(event, member));
break;
case CLEAR_ALL:
listener.mapCleared(createMapEvent(event, member));
break;
default:
throw new IllegalArgumentException("Not a known event type " + event.getEventType());
}
}

private MapEvent createMapEvent(PortableEntryEvent event, Member member) {
return new MapEvent(name, member, event.getEventType().getType(), event.getNumberOfAffectedEntries());
}

private EntryEvent<K, V> createEntryEvent(PortableEntryEvent event, Member member) {
V value = null;
V oldValue = null;
if (includeValue) {
value = toObject(event.getValue());
oldValue = toObject(event.getOldValue());
}
K key = toObject(event.getKey());
return new EntryEvent<K, V>(name, member,
event.getEventType().getType(), key, oldValue, value);
}

@Override
public void onListenerRegister() {

Expand Down
Expand Up @@ -23,6 +23,7 @@
import com.hazelcast.core.HazelcastException;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.ICompletableFuture;
import com.hazelcast.core.MapEvent;
import com.hazelcast.core.Member;
import com.hazelcast.core.MultiMap;
import com.hazelcast.mapreduce.Collator;
Expand Down Expand Up @@ -323,33 +324,38 @@ protected long getTimeInMillis(final long time, final TimeUnit timeunit) {
private EventHandler<PortableEntryEvent> createHandler(final EntryListener<K, V> listener, final boolean includeValue) {
return new EventHandler<PortableEntryEvent>() {
public void handle(PortableEntryEvent event) {
V value = null;
V oldValue = null;
if (includeValue) {
value = (V) toObject(event.getValue());
oldValue = (V) toObject(event.getOldValue());
}
K key = (K) toObject(event.getKey());
Member member = getContext().getClusterService().getMember(event.getUuid());
EntryEvent<K, V> entryEvent = new EntryEvent<K, V>(name, member,
event.getEventType().getType(), key, oldValue, value);
switch (event.getEventType()) {
case ADDED:
listener.entryAdded(entryEvent);
listener.entryAdded(createEntryEvent(event, member));
break;
case REMOVED:
listener.entryRemoved(entryEvent);
break;
case UPDATED:
listener.entryUpdated(entryEvent);
listener.entryRemoved(createEntryEvent(event, member));
break;
case EVICTED:
listener.entryEvicted(entryEvent);
case CLEAR_ALL:
listener.mapCleared(createMapEvent(event, member));
break;
default:
throw new IllegalArgumentException("Not a known event type " + event.getEventType());
}
}

private MapEvent createMapEvent(PortableEntryEvent event, Member member) {
return new MapEvent(name, member, event.getEventType().getType(), event.getNumberOfAffectedEntries());
}

private EntryEvent<K, V> createEntryEvent(PortableEntryEvent event, Member member) {
V value = null;
V oldValue = null;
if (includeValue) {
value = toObject(event.getValue());
oldValue = toObject(event.getOldValue());
}
K key = toObject(event.getKey());
return new EntryEvent<K, V>(name, member,
event.getEventType().getType(), key, oldValue, value);
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe you can decrease code duplication later.

@Override
public void onListenerRegister() {
}
Expand Down
Expand Up @@ -1147,10 +1147,9 @@ public void entryUpdated(EntryEvent event) {
}
public void entryEvicted(EntryEvent event) {
}

@Override
public void mapEvicted(MapEvent event) {
// TODO what to do here?
}
public void mapCleared(MapEvent event) {
}
}

Expand Down
@@ -0,0 +1,89 @@
package com.hazelcast.client.map;

import com.hazelcast.client.HazelcastClient;
import com.hazelcast.core.EntryAdapter;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IMap;
import com.hazelcast.core.MapEvent;
import com.hazelcast.test.HazelcastSerialClassRunner;
import com.hazelcast.test.HazelcastTestSupport;
import com.hazelcast.test.annotation.QuickTest;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;

@RunWith(HazelcastSerialClassRunner.class)
@Category(QuickTest.class)
public class ClientMapEvictAllTest extends HazelcastTestSupport {

@Test
public void evictAll_firesEvent() throws Exception {
final String mapName = randomMapName();
final HazelcastInstance server1 = Hazelcast.newHazelcastInstance(null);
final HazelcastInstance server2 = Hazelcast.newHazelcastInstance(null);
final HazelcastInstance client = HazelcastClient.newHazelcastClient();
final IMap<Object, Object> map = client.getMap(mapName);
final CountDownLatch evictedEntryCount = new CountDownLatch(3);
map.addEntryListener(new EntryAdapter<Object, Object>() {
@Override
public void mapEvicted(MapEvent event) {
final int affected = event.getNumberOfEntriesAffected();
for (int i = 0; i < affected; i++) {
evictedEntryCount.countDown();
}

}
}, true);

map.put(1, 1);
map.put(2, 1);
map.put(3, 1);
map.evictAll();

assertOpenEventually(evictedEntryCount);
assertEquals(0, map.size());
closeResources(client, server1, server2);
}

@Test
public void evictAll_firesOnlyOneEvent() throws Exception {
final String mapName = randomMapName();
final HazelcastInstance server1 = Hazelcast.newHazelcastInstance(null);
final HazelcastInstance client = HazelcastClient.newHazelcastClient();
final IMap<Object, Object> map = client.getMap(mapName);
final CountDownLatch eventCount = new CountDownLatch(2);
map.addEntryListener(new EntryAdapter<Object, Object>() {
@Override
public void mapEvicted(MapEvent event) {
eventCount.countDown();

}
}, true);

map.put(1, 1);
map.put(2, 1);
map.put(3, 1);
map.evictAll();

assertFalse(eventCount.await(10, TimeUnit.SECONDS));
assertEquals(1, eventCount.getCount());
closeResources(client, server1);
}


private static void closeResources(HazelcastInstance... instances) {
if (instances == null) {
return;
}
for (HazelcastInstance instance : instances) {
instance.shutdown();
}
}
}
Expand Up @@ -87,6 +87,11 @@ public void entryEvicted(EntryEvent<Object, Object> event) {
public void mapEvicted(MapEvent event) {

}

@Override
public void mapCleared(MapEvent event) {

}
}, true);
HazelcastInstance instance2 = Hazelcast.newHazelcastInstance();
instance1.getLifecycleService().terminate();
Expand Down
Expand Up @@ -27,10 +27,11 @@
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IMap;
import com.hazelcast.core.MapEvent;
import com.hazelcast.core.MapStoreAdapter;
import com.hazelcast.core.MultiMap;
import com.hazelcast.core.PartitionAware;
import com.hazelcast.map.AbstractEntryProcessor;
import com.hazelcast.core.MapEvent;
import com.hazelcast.monitor.LocalMapStats;
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
Expand Down Expand Up @@ -59,7 +60,11 @@

import static com.hazelcast.test.HazelcastTestSupport.assertOpenEventually;
import static com.hazelcast.test.HazelcastTestSupport.randomString;
import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;

@RunWith(HazelcastParallelClassRunner.class)
@Category(QuickTest.class)
Expand Down Expand Up @@ -108,6 +113,7 @@ public void testIssue537() throws InterruptedException {
public void entryAdded(EntryEvent event) {
latch.countDown();
}

public void entryEvicted(EntryEvent event) {
final Object value = event.getValue();
final Object oldValue = event.getOldValue();
Expand Down Expand Up @@ -680,6 +686,20 @@ public void testExecuteOnKeys() throws Exception {
assertEquals(1, (int) map.get(9));
}

@Test
public void testListeners_clearAllFromNode() {
final String name = randomString();
final MultiMap mm = client.getMultiMap(name);
final CountDownLatch gateClearAll = new CountDownLatch(1);
final CountDownLatch gateAdd = new CountDownLatch(1);
final EntryListener listener = new EntListener(gateAdd, null, null, null, gateClearAll, null);
mm.addEntryListener(listener, false);
mm.put("key", "value");
server.getMultiMap(name).clear();
assertOpenEventually(gateAdd);
assertOpenEventually(gateClearAll);
}

/**
* Issue #996
*/
Expand All @@ -689,6 +709,8 @@ public void testEntryListener() throws InterruptedException {
final CountDownLatch gateRemove = new CountDownLatch(1);
final CountDownLatch gateEvict = new CountDownLatch(1);
final CountDownLatch gateUpdate = new CountDownLatch(1);
final CountDownLatch gateClearAll = new CountDownLatch(1);
final CountDownLatch gateEvictAll = new CountDownLatch(1);

final String mapName = randomString();

Expand All @@ -699,7 +721,7 @@ public void testEntryListener() throws InterruptedException {

assertEquals(1, clientMap.size());

final EntryListener listener = new EntListener(gateAdd, gateRemove, gateEvict, gateUpdate);
final EntryListener listener = new EntListener(gateAdd, gateRemove, gateEvict, gateUpdate, gateClearAll, gateEvictAll);

clientMap.addEntryListener(listener, new SqlPredicate("id=1"), 2, true);
clientMap.put(2, new Deal(1));
Expand All @@ -709,23 +731,33 @@ public void testEntryListener() throws InterruptedException {
clientMap.put(2, new Deal(1));
clientMap.evict(2);

clientMap.clear();
clientMap.evictAll();

assertTrue(gateAdd.await(10, TimeUnit.SECONDS));
assertTrue(gateRemove.await(10, TimeUnit.SECONDS));
assertTrue(gateEvict.await(10, TimeUnit.SECONDS));
assertTrue(gateUpdate.await(10, TimeUnit.SECONDS));
assertTrue(gateClearAll.await(10, TimeUnit.SECONDS));
assertTrue(gateEvictAll.await(10, TimeUnit.SECONDS));
}

static class EntListener implements EntryListener<Integer, Deal>, Serializable {
private final CountDownLatch _gateAdd;
private final CountDownLatch _gateRemove;
private final CountDownLatch _gateEvict;
private final CountDownLatch _gateUpdate;
private final CountDownLatch _gateClearAll;
private final CountDownLatch _gateEvictAll;

EntListener(CountDownLatch gateAdd, CountDownLatch gateRemove, CountDownLatch gateEvict, CountDownLatch gateUpdate) {
EntListener(CountDownLatch gateAdd, CountDownLatch gateRemove, CountDownLatch gateEvict,
CountDownLatch gateUpdate, CountDownLatch gateClearAll, CountDownLatch gateEvictAll) {
_gateAdd = gateAdd;
_gateRemove = gateRemove;
_gateEvict = gateEvict;
_gateUpdate = gateUpdate;
_gateClearAll = gateClearAll;
_gateEvictAll = gateEvictAll;
}

@Override
Expand All @@ -740,7 +772,12 @@ public void entryEvicted(EntryEvent<Integer, Deal> arg0) {

@Override
public void mapEvicted(MapEvent event) {
// TODO what to do here?
_gateEvictAll.countDown();
}

@Override
public void mapCleared(MapEvent event) {
_gateClearAll.countDown();
}

@Override
Expand Down