Skip to content

Commit

Permalink
Adds partition lost listener configuration to ICache.
Browse files Browse the repository at this point in the history
  • Loading branch information
ibrahim gürses committed Aug 20, 2015
1 parent 0d9f846 commit bff583c
Show file tree
Hide file tree
Showing 46 changed files with 2,368 additions and 22 deletions.
2 changes: 2 additions & 0 deletions checkstyle/suppressions.xml
Expand Up @@ -76,6 +76,7 @@
<suppress checks="MethodCount" files="com/hazelcast/cache/impl/nearcache/impl/store/AbstractNearCacheRecordStore"/>
<suppress checks="NPathComplexity" files="com/hazelcast/cache/impl/eviction/impl/evaluator/AbstractEvictionPolicyEvaluator"/>
<suppress checks="MethodCount" files="com/hazelcast/cache/impl/AbstractHazelcastCacheManager"/>
<suppress checks="ClassFanOutComplexityCheck" files="com/hazelcast/cache/impl/AbstractCacheService"/>

<!-- Core -->
<suppress checks="JavadocMethod" files="com/hazelcast/core/"/>
Expand Down Expand Up @@ -189,6 +190,7 @@
<suppress checks="ParameterNumber" files="com/hazelcast/client/proxy/ClientMapReduceProxy"/>
<suppress checks="MethodCount" files="com/hazelcast/client/impl/protocol/ClientMessage"/>
<suppress checks="ClassFanOutComplexity" files="com/hazelcast/client/cache/impl/AbstractClientInternalCacheProxy"/>
<suppress checks="MethodCount" files="com/hazelcast/client/cache/impl/ClientCacheProxy"/>

<!-- Monitor -->
<suppress checks="JavadocMethod" files="com/hazelcast/monitor/"/>
Expand Down
Expand Up @@ -18,17 +18,22 @@

import com.hazelcast.cache.impl.CacheEntryProcessorResult;
import com.hazelcast.cache.impl.CacheEventListenerAdaptor;
import com.hazelcast.cache.impl.CacheEventType;
import com.hazelcast.cache.impl.CacheProxyUtil;
import com.hazelcast.cache.impl.event.CachePartitionLostEvent;
import com.hazelcast.cache.impl.event.CachePartitionLostListener;
import com.hazelcast.cache.impl.nearcache.NearCache;
import com.hazelcast.client.impl.ClientMessageDecoder;
import com.hazelcast.client.impl.HazelcastClientInstanceImpl;
import com.hazelcast.client.impl.protocol.ClientMessage;
import com.hazelcast.client.impl.protocol.codec.CacheAddEntryListenerCodec;
import com.hazelcast.client.impl.protocol.codec.CacheAddPartitionLostListenerCodec;
import com.hazelcast.client.impl.protocol.codec.CacheContainsKeyCodec;
import com.hazelcast.client.impl.protocol.codec.CacheEntryProcessorCodec;
import com.hazelcast.client.impl.protocol.codec.CacheListenerRegistrationCodec;
import com.hazelcast.client.impl.protocol.codec.CacheLoadAllCodec;
import com.hazelcast.client.impl.protocol.codec.CacheRemoveEntryListenerCodec;
import com.hazelcast.client.impl.protocol.codec.CacheRemovePartitionLostListenerCodec;
import com.hazelcast.client.spi.ClientContext;
import com.hazelcast.client.spi.ClientListenerService;
import com.hazelcast.client.spi.EventHandler;
Expand Down Expand Up @@ -385,4 +390,58 @@ public Iterator<Entry<K, V>> iterator() {
return new ClientClusterWideIterator<K, V>(this, clientContext);
}

@Override
public String addPartitionLostListener(CachePartitionLostListener listener) {
ClientMessage request = CacheAddPartitionLostListenerCodec.encodeRequest(name);
final EventHandler<ClientMessage> handler = new ClientCachePartitionLostEventHandler(listener);
return clientContext.getListenerService().startListening(request, null, handler,
new ClientMessageDecoder() {
@Override
public <T> T decodeClientMessage(ClientMessage clientMessage) {
return (T) CacheAddPartitionLostListenerCodec.decodeResponse(clientMessage).response;
}
});
}

@Override
public boolean removePartitionLostListener(String id) {
return clientContext.getListenerService().stopListening(id, new ListenerRemoveCodec() {
@Override
public ClientMessage encodeRequest(String realRegistrationId) {
return CacheRemovePartitionLostListenerCodec.encodeRequest(name, realRegistrationId);
}

@Override
public boolean decodeResponse(ClientMessage clientMessage) {
return CacheRemovePartitionLostListenerCodec.decodeResponse(clientMessage).response;
}
});
}

private class ClientCachePartitionLostEventHandler extends CacheAddPartitionLostListenerCodec.AbstractEventHandler
implements EventHandler<ClientMessage> {

private CachePartitionLostListener listener;

public ClientCachePartitionLostEventHandler(CachePartitionLostListener listener) {
this.listener = listener;
}

@Override
public void beforeListenerRegister() {

}

@Override
public void onListenerRegister() {

}

@Override
public void handle(int partitionId, String uuid) {
final Member member = clientContext.getClusterService().getMember(uuid);
listener.partitionLost(new CachePartitionLostEvent(name, member, CacheEventType.PARTITION_LOST.getType(),
partitionId));
}
}
}
@@ -0,0 +1,212 @@
package com.hazelcast.client.cache;

import com.hazelcast.cache.ICache;
import com.hazelcast.cache.impl.CacheService;
import com.hazelcast.cache.impl.HazelcastServerCachingProvider;
import com.hazelcast.cache.impl.event.CachePartitionLostEvent;
import com.hazelcast.cache.impl.event.CachePartitionLostListener;
import com.hazelcast.client.cache.impl.HazelcastClientCachingProvider;
import com.hazelcast.client.test.TestHazelcastFactory;
import com.hazelcast.config.CacheConfig;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.partition.InternalPartitionLostEvent;
import com.hazelcast.spi.EventRegistration;
import com.hazelcast.spi.impl.eventservice.InternalEventService;
import com.hazelcast.test.AssertTask;
import com.hazelcast.test.HazelcastParallelClassRunner;
import com.hazelcast.test.HazelcastTestSupport;
import com.hazelcast.test.annotation.ParallelTest;
import com.hazelcast.test.annotation.QuickTest;
import org.junit.After;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

import javax.cache.Cache;
import javax.cache.CacheManager;
import javax.cache.spi.CachingProvider;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;

import static com.hazelcast.cache.impl.HazelcastServerCachingProvider.createCachingProvider;
import static com.hazelcast.cache.impl.ICacheService.SERVICE_NAME;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;

@RunWith(HazelcastParallelClassRunner.class)
@Category({QuickTest.class, ParallelTest.class})
public class ClientCachePartitionLostListenerTest extends HazelcastTestSupport {

private final TestHazelcastFactory hazelcastFactory = new TestHazelcastFactory();

@After
public void tearDown() {
hazelcastFactory.terminateAll();
}

@Test
public void test_cachePartitionLostListener_registered() {

final String cacheName = randomName();

HazelcastInstance instance = hazelcastFactory.newHazelcastInstance();
final HazelcastInstance client = hazelcastFactory.newHazelcastClient();

final CachingProvider cachingProvider = HazelcastClientCachingProvider.createCachingProvider(client);
final CacheManager cacheManager = cachingProvider.getCacheManager();
final CacheConfig<Integer, String> cacheConfig = new CacheConfig<Integer, String>();
final Cache<Integer, String> cache = cacheManager.createCache(cacheName, cacheConfig);
final ICache iCache = cache.unwrap(ICache.class);

iCache.addPartitionLostListener(new CachePartitionLostListener() {
@Override
public void partitionLost(CachePartitionLostEvent event) {
}
});

assertRegistrationsSizeEventually(instance, cacheName, 1);
}

@Test
public void test_cachePartitionLostListener_invoked() {
final String cacheName = randomName();
HazelcastInstance instance = hazelcastFactory.newHazelcastInstance();
final HazelcastInstance client = hazelcastFactory.newHazelcastClient();

final HazelcastServerCachingProvider cachingProvider = createCachingProvider(instance);
final CacheManager cacheManager = cachingProvider.getCacheManager();
final CacheConfig<Integer, String> config = new CacheConfig<Integer, String>();
config.setBackupCount(0);
cacheManager.createCache(cacheName, config);

final CachingProvider clientCachingProvider = HazelcastClientCachingProvider.createCachingProvider(client);
final CacheManager clientCacheManager = clientCachingProvider.getCacheManager();
final Cache<Integer, String> cache = clientCacheManager.getCache(cacheName);

final ICache iCache = cache.unwrap(ICache.class);

final EventCollectingCachePartitionLostListener listener = new EventCollectingCachePartitionLostListener();
iCache.addPartitionLostListener(listener);

final CacheService cacheService = getNode(instance).getNodeEngine().getService(CacheService.SERVICE_NAME);
final int partitionId = 5;
cacheService.onPartitionLost(new InternalPartitionLostEvent(partitionId, 0, null));

assertCachePartitionLostEventEventually(listener, partitionId);
}

@Test
public void test_cachePartitionLostListener_invoked_fromOtherNode() {

final String cacheName = randomName();
HazelcastInstance instance1 = hazelcastFactory.newHazelcastInstance();
HazelcastInstance instance2 = hazelcastFactory.newHazelcastInstance();
final HazelcastInstance client = hazelcastFactory.newHazelcastClient();

final HazelcastServerCachingProvider cachingProvider = createCachingProvider(instance1);
final CacheManager cacheManager = cachingProvider.getCacheManager();
final CacheConfig<Integer, String> config = new CacheConfig<Integer, String>();
config.setBackupCount(0);
cacheManager.createCache(cacheName, config);

final CachingProvider clientCachingProvider = HazelcastClientCachingProvider.createCachingProvider(client);
final CacheManager clientCacheManager = clientCachingProvider.getCacheManager();
final Cache<Integer, String> cache = clientCacheManager.getCache(cacheName);

final ICache iCache = cache.unwrap(ICache.class);

final EventCollectingCachePartitionLostListener listener = new EventCollectingCachePartitionLostListener();
iCache.addPartitionLostListener(listener);

assertRegistrationsSizeEventually(instance1, cacheName, 1);
assertRegistrationsSizeEventually(instance2, cacheName, 1);

final CacheService cacheService1 = getNode(instance1).getNodeEngine().getService(CacheService.SERVICE_NAME);
final CacheService cacheService2 = getNode(instance2).getNodeEngine().getService(CacheService.SERVICE_NAME);
final int partitionId = 5;
cacheService1.onPartitionLost(new InternalPartitionLostEvent(partitionId, 0, null));
cacheService2.onPartitionLost(new InternalPartitionLostEvent(partitionId, 0, null));

assertCachePartitionLostEventEventually(listener, partitionId);
}

@Test
public void test_cachePartitionLostListener_removed() {
final String cacheName = randomName();
HazelcastInstance instance = hazelcastFactory.newHazelcastInstance();
final HazelcastInstance client = hazelcastFactory.newHazelcastClient();

final HazelcastServerCachingProvider cachingProvider = createCachingProvider(instance);
final CacheManager cacheManager = cachingProvider.getCacheManager();
final CacheConfig<Integer, String> config = new CacheConfig<Integer, String>();
config.setBackupCount(0);
cacheManager.createCache(cacheName, config);

final CachingProvider clientCachingProvider = HazelcastClientCachingProvider.createCachingProvider(client);
final CacheManager clientCacheManager = clientCachingProvider.getCacheManager();
final Cache<Integer, String> cache = clientCacheManager.getCache(cacheName);
final ICache iCache = cache.unwrap(ICache.class);

final String registrationId = iCache.addPartitionLostListener(mock(CachePartitionLostListener.class));

assertRegistrationsSizeEventually(instance, cacheName, 1);

assertTrue(iCache.removePartitionLostListener(registrationId));
assertRegistrationsSizeEventually(instance, cacheName, 0);
}

private void assertCachePartitionLostEventEventually(final EventCollectingCachePartitionLostListener listener,
final int partitionId) {
assertTrueEventually(new AssertTask() {
@Override
public void run()
throws Exception {

final List<CachePartitionLostEvent> events = listener.getEvents();
assertFalse(events.isEmpty());
assertEquals(partitionId, events.get(0).getPartitionId());

}
});
}

private void assertRegistrationsSizeEventually(final HazelcastInstance instance, final String cacheName, final int size) {
assertTrueEventually(new AssertTask() {
@Override
public void run()
throws Exception {

final InternalEventService eventService = getNode(instance).getNodeEngine().getEventService();
final Collection<EventRegistration> registrations = eventService.getRegistrations(SERVICE_NAME, cacheName);
assertEquals(size, registrations.size());

}
});
}

private class EventCollectingCachePartitionLostListener
implements CachePartitionLostListener {

private final List<CachePartitionLostEvent> events = Collections.synchronizedList(new LinkedList<CachePartitionLostEvent>());

public EventCollectingCachePartitionLostListener() {
}

@Override
public void partitionLost(CachePartitionLostEvent event) {
this.events.add(event);
}

public List<CachePartitionLostEvent> getEvents() {
synchronized (events) {
return new ArrayList<CachePartitionLostEvent>(events);
}
}
}

}
Expand Up @@ -18,13 +18,18 @@

import com.hazelcast.cache.impl.CacheEntryProcessorResult;
import com.hazelcast.cache.impl.CacheEventListenerAdaptor;
import com.hazelcast.cache.impl.CacheEventType;
import com.hazelcast.cache.impl.CacheProxyUtil;
import com.hazelcast.cache.impl.client.CacheAddEntryListenerRequest;
import com.hazelcast.cache.impl.client.CacheAddPartitionLostListenerRequest;
import com.hazelcast.cache.impl.client.CacheContainsKeyRequest;
import com.hazelcast.cache.impl.client.CacheEntryProcessorRequest;
import com.hazelcast.cache.impl.client.CacheListenerRegistrationRequest;
import com.hazelcast.cache.impl.client.CacheLoadAllRequest;
import com.hazelcast.cache.impl.client.CacheRemoveEntryListenerRequest;
import com.hazelcast.cache.impl.client.CacheRemovePartitionLostListenerRequest;
import com.hazelcast.cache.impl.event.CachePartitionLostEvent;
import com.hazelcast.cache.impl.event.CachePartitionLostListener;
import com.hazelcast.cache.impl.nearcache.NearCache;
import com.hazelcast.client.impl.HazelcastClientInstanceImpl;
import com.hazelcast.client.spi.ClientContext;
Expand All @@ -35,6 +40,7 @@
import com.hazelcast.core.Member;
import com.hazelcast.nio.Address;
import com.hazelcast.nio.serialization.Data;
import com.hazelcast.spi.impl.PortableCachePartitionLostEvent;
import com.hazelcast.spi.impl.SerializableList;
import com.hazelcast.util.ExceptionUtil;

Expand Down Expand Up @@ -357,4 +363,52 @@ public Iterator<Entry<K, V>> iterator() {
return new ClientClusterWideIterator<K, V>(this, clientContext);
}


@Override
public String addPartitionLostListener(CachePartitionLostListener listener) {
ensureOpen();
if (listener == null) {
throw new NullPointerException("CachePartitionLostListener can't be null");
}
final EventHandler<PortableCachePartitionLostEvent> handler = new ClientCachePartitionLostEventHandler(listener);
final CacheAddPartitionLostListenerRequest registrationRequest =
new CacheAddPartitionLostListenerRequest(name);
return clientContext.getListenerService().startListening(registrationRequest, null, handler);
}

@Override
public boolean removePartitionLostListener(String id) {
ensureOpen();
if (id == null) {
throw new NullPointerException("Registration id can't be null");
}
final CacheRemovePartitionLostListenerRequest request = new CacheRemovePartitionLostListenerRequest(name, id);
return clientContext.getListenerService().stopListening(request, id);
}

private class ClientCachePartitionLostEventHandler implements EventHandler<PortableCachePartitionLostEvent> {

private CachePartitionLostListener listener;

public ClientCachePartitionLostEventHandler(CachePartitionLostListener listener) {
this.listener = listener;
}

@Override
public void handle(PortableCachePartitionLostEvent event) {
final Member member = clientContext.getClusterService().getMember(event.getUuid());
listener.partitionLost(new CachePartitionLostEvent(name, member, CacheEventType.PARTITION_LOST.getType(),
event.getPartitionId()));
}

@Override
public void beforeListenerRegister() {

}

@Override
public void onListenerRegister() {

}
}
}

0 comments on commit bff583c

Please sign in to comment.