From bff583ccdbfe79a846f5f8ee5794c4e0992a518e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?ibrahim=20gu=CC=88rses?= Date: Thu, 20 Aug 2015 11:47:29 +0300 Subject: [PATCH] Adds partition lost listener configuration to ICache. --- checkstyle/suppressions.xml | 2 + .../client/cache/impl/ClientCacheProxy.java | 59 +++++ .../ClientCachePartitionLostListenerTest.java | 212 ++++++++++++++++++ .../client/cache/impl/ClientCacheProxy.java | 54 +++++ .../ClientCachePartitionLostListenerTest.java | 212 ++++++++++++++++++ .../HazelcastConfigBeanDefinitionParser.java | 16 +- .../main/resources/hazelcast-spring-3.6.xsd | 12 + .../DummyCachePartitionLostListenerImpl.java | 12 + .../hazelcast/spring/context/TestJCache.java | 32 ++- .../test-jcache-application-context.xml | 5 + .../main/java/com/hazelcast/cache/ICache.java | 29 +++ .../cache/impl/AbstractCacheService.java | 42 +++- .../impl/AbstractInternalCacheProxy.java | 38 ++++ .../hazelcast/cache/impl/CacheEventType.java | 9 +- .../cache/impl/CachePartitionEventData.java | 68 ++++++ .../cache/impl/CachePortableHook.java | 16 +- .../com/hazelcast/cache/impl/CacheProxy.java | 27 +++ .../CacheAddPartitionLostListenerRequest.java | 119 ++++++++++ ...cheRemovePartitionLostListenerRequest.java | 65 ++++++ .../cache/impl/event/AbstractICacheEvent.java | 103 +++++++++ .../impl/event/CachePartitionLostEvent.java | 54 +++++ .../event/CachePartitionLostEventFilter.java | 55 +++++ .../event/CachePartitionLostListener.java | 35 +++ .../cache/impl/event/ICacheEvent.java | 49 ++++ ...rnalCachePartitionLostListenerAdapter.java | 41 ++++ .../cache/impl/event/package-info.java | 23 ++ .../impl/protocol/EventMessageConst.java | 2 + ...heAddPartitionLostListenerMessageTask.java | 107 +++++++++ ...emovePartitionLostListenerMessageTask.java | 79 +++++++ .../protocol/template/CacheCodecTemplate.java | 7 + .../template/EventResponseTemplate.java | 2 + .../com/hazelcast/config/CacheConfig.java | 40 +++- .../CachePartitionLostListenerConfig.java | 94 ++++++++ ...hePartitionLostListenerConfigReadOnly.java | 49 ++++ .../hazelcast/config/CacheSimpleConfig.java | 37 +++ .../hazelcast/config/ConfigXmlGenerator.java | 18 ++ .../hazelcast/config/XmlConfigBuilder.java | 12 + .../impl/PortableCachePartitionLostEvent.java | 71 ++++++ .../hazelcast/spi/impl/SpiPortableHook.java | 3 + .../main/resources/hazelcast-config-3.6.xsd | 1 + .../CachePartitionLostListenerConfigTest.java | 78 +++++++ .../CachePartitionLostListenerStressTest.java | 175 +++++++++++++++ .../cache/CachePartitionLostListenerTest.java | 152 +++++++++++++ .../config/XMLConfigBuilderTest.java | 37 +++ .../AbstractPartitionLostListenerTest.java | 4 + ...zelcast-jcache-partition-lost-listener.xml | 33 +++ 46 files changed, 2368 insertions(+), 22 deletions(-) create mode 100644 hazelcast-client-new/src/test/java/com/hazelcast/client/cache/ClientCachePartitionLostListenerTest.java create mode 100644 hazelcast-client/src/test/java/com/hazelcast/client/cache/ClientCachePartitionLostListenerTest.java create mode 100644 hazelcast-spring/src/test/java/com/hazelcast/spring/DummyCachePartitionLostListenerImpl.java create mode 100644 hazelcast/src/main/java/com/hazelcast/cache/impl/CachePartitionEventData.java create mode 100644 hazelcast/src/main/java/com/hazelcast/cache/impl/client/CacheAddPartitionLostListenerRequest.java create mode 100644 hazelcast/src/main/java/com/hazelcast/cache/impl/client/CacheRemovePartitionLostListenerRequest.java create mode 100644 hazelcast/src/main/java/com/hazelcast/cache/impl/event/AbstractICacheEvent.java create mode 100644 hazelcast/src/main/java/com/hazelcast/cache/impl/event/CachePartitionLostEvent.java create mode 100644 hazelcast/src/main/java/com/hazelcast/cache/impl/event/CachePartitionLostEventFilter.java create mode 100644 hazelcast/src/main/java/com/hazelcast/cache/impl/event/CachePartitionLostListener.java create mode 100644 hazelcast/src/main/java/com/hazelcast/cache/impl/event/ICacheEvent.java create mode 100644 hazelcast/src/main/java/com/hazelcast/cache/impl/event/InternalCachePartitionLostListenerAdapter.java create mode 100644 hazelcast/src/main/java/com/hazelcast/cache/impl/event/package-info.java create mode 100644 hazelcast/src/main/java/com/hazelcast/client/impl/protocol/task/cache/CacheAddPartitionLostListenerMessageTask.java create mode 100644 hazelcast/src/main/java/com/hazelcast/client/impl/protocol/task/cache/CacheRemovePartitionLostListenerMessageTask.java create mode 100644 hazelcast/src/main/java/com/hazelcast/config/CachePartitionLostListenerConfig.java create mode 100644 hazelcast/src/main/java/com/hazelcast/config/CachePartitionLostListenerConfigReadOnly.java create mode 100644 hazelcast/src/main/java/com/hazelcast/spi/impl/PortableCachePartitionLostEvent.java create mode 100644 hazelcast/src/test/java/com/hazelcast/cache/CachePartitionLostListenerConfigTest.java create mode 100644 hazelcast/src/test/java/com/hazelcast/cache/CachePartitionLostListenerStressTest.java create mode 100644 hazelcast/src/test/java/com/hazelcast/cache/CachePartitionLostListenerTest.java create mode 100644 hazelcast/src/test/resources/test-hazelcast-jcache-partition-lost-listener.xml diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index ce8af10de7e9..21c20e55fdd4 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -76,6 +76,7 @@ + @@ -189,6 +190,7 @@ + diff --git a/hazelcast-client-new/src/main/java/com/hazelcast/client/cache/impl/ClientCacheProxy.java b/hazelcast-client-new/src/main/java/com/hazelcast/client/cache/impl/ClientCacheProxy.java index f3c46586a9f0..493428997f60 100644 --- a/hazelcast-client-new/src/main/java/com/hazelcast/client/cache/impl/ClientCacheProxy.java +++ b/hazelcast-client-new/src/main/java/com/hazelcast/client/cache/impl/ClientCacheProxy.java @@ -18,17 +18,22 @@ import com.hazelcast.cache.impl.CacheEntryProcessorResult; import com.hazelcast.cache.impl.CacheEventListenerAdaptor; +import com.hazelcast.cache.impl.CacheEventType; import com.hazelcast.cache.impl.CacheProxyUtil; +import com.hazelcast.cache.impl.event.CachePartitionLostEvent; +import com.hazelcast.cache.impl.event.CachePartitionLostListener; import com.hazelcast.cache.impl.nearcache.NearCache; import com.hazelcast.client.impl.ClientMessageDecoder; import com.hazelcast.client.impl.HazelcastClientInstanceImpl; import com.hazelcast.client.impl.protocol.ClientMessage; import com.hazelcast.client.impl.protocol.codec.CacheAddEntryListenerCodec; +import com.hazelcast.client.impl.protocol.codec.CacheAddPartitionLostListenerCodec; import com.hazelcast.client.impl.protocol.codec.CacheContainsKeyCodec; import com.hazelcast.client.impl.protocol.codec.CacheEntryProcessorCodec; import com.hazelcast.client.impl.protocol.codec.CacheListenerRegistrationCodec; import com.hazelcast.client.impl.protocol.codec.CacheLoadAllCodec; import com.hazelcast.client.impl.protocol.codec.CacheRemoveEntryListenerCodec; +import com.hazelcast.client.impl.protocol.codec.CacheRemovePartitionLostListenerCodec; import com.hazelcast.client.spi.ClientContext; import com.hazelcast.client.spi.ClientListenerService; import com.hazelcast.client.spi.EventHandler; @@ -385,4 +390,58 @@ public Iterator> iterator() { return new ClientClusterWideIterator(this, clientContext); } + @Override + public String addPartitionLostListener(CachePartitionLostListener listener) { + ClientMessage request = CacheAddPartitionLostListenerCodec.encodeRequest(name); + final EventHandler handler = new ClientCachePartitionLostEventHandler(listener); + return clientContext.getListenerService().startListening(request, null, handler, + new ClientMessageDecoder() { + @Override + public T decodeClientMessage(ClientMessage clientMessage) { + return (T) CacheAddPartitionLostListenerCodec.decodeResponse(clientMessage).response; + } + }); + } + + @Override + public boolean removePartitionLostListener(String id) { + return clientContext.getListenerService().stopListening(id, new ListenerRemoveCodec() { + @Override + public ClientMessage encodeRequest(String realRegistrationId) { + return CacheRemovePartitionLostListenerCodec.encodeRequest(name, realRegistrationId); + } + + @Override + public boolean decodeResponse(ClientMessage clientMessage) { + return CacheRemovePartitionLostListenerCodec.decodeResponse(clientMessage).response; + } + }); + } + + private class ClientCachePartitionLostEventHandler extends CacheAddPartitionLostListenerCodec.AbstractEventHandler + implements EventHandler { + + private CachePartitionLostListener listener; + + public ClientCachePartitionLostEventHandler(CachePartitionLostListener listener) { + this.listener = listener; + } + + @Override + public void beforeListenerRegister() { + + } + + @Override + public void onListenerRegister() { + + } + + @Override + public void handle(int partitionId, String uuid) { + final Member member = clientContext.getClusterService().getMember(uuid); + listener.partitionLost(new CachePartitionLostEvent(name, member, CacheEventType.PARTITION_LOST.getType(), + partitionId)); + } + } } diff --git a/hazelcast-client-new/src/test/java/com/hazelcast/client/cache/ClientCachePartitionLostListenerTest.java b/hazelcast-client-new/src/test/java/com/hazelcast/client/cache/ClientCachePartitionLostListenerTest.java new file mode 100644 index 000000000000..f9a3bbeca499 --- /dev/null +++ b/hazelcast-client-new/src/test/java/com/hazelcast/client/cache/ClientCachePartitionLostListenerTest.java @@ -0,0 +1,212 @@ +package com.hazelcast.client.cache; + +import com.hazelcast.cache.ICache; +import com.hazelcast.cache.impl.CacheService; +import com.hazelcast.cache.impl.HazelcastServerCachingProvider; +import com.hazelcast.cache.impl.event.CachePartitionLostEvent; +import com.hazelcast.cache.impl.event.CachePartitionLostListener; +import com.hazelcast.client.cache.impl.HazelcastClientCachingProvider; +import com.hazelcast.client.test.TestHazelcastFactory; +import com.hazelcast.config.CacheConfig; +import com.hazelcast.core.HazelcastInstance; +import com.hazelcast.partition.InternalPartitionLostEvent; +import com.hazelcast.spi.EventRegistration; +import com.hazelcast.spi.impl.eventservice.InternalEventService; +import com.hazelcast.test.AssertTask; +import com.hazelcast.test.HazelcastParallelClassRunner; +import com.hazelcast.test.HazelcastTestSupport; +import com.hazelcast.test.annotation.ParallelTest; +import com.hazelcast.test.annotation.QuickTest; +import org.junit.After; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import javax.cache.Cache; +import javax.cache.CacheManager; +import javax.cache.spi.CachingProvider; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; + +import static com.hazelcast.cache.impl.HazelcastServerCachingProvider.createCachingProvider; +import static com.hazelcast.cache.impl.ICacheService.SERVICE_NAME; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; + +@RunWith(HazelcastParallelClassRunner.class) +@Category({QuickTest.class, ParallelTest.class}) +public class ClientCachePartitionLostListenerTest extends HazelcastTestSupport { + + private final TestHazelcastFactory hazelcastFactory = new TestHazelcastFactory(); + + @After + public void tearDown() { + hazelcastFactory.terminateAll(); + } + + @Test + public void test_cachePartitionLostListener_registered() { + + final String cacheName = randomName(); + + HazelcastInstance instance = hazelcastFactory.newHazelcastInstance(); + final HazelcastInstance client = hazelcastFactory.newHazelcastClient(); + + final CachingProvider cachingProvider = HazelcastClientCachingProvider.createCachingProvider(client); + final CacheManager cacheManager = cachingProvider.getCacheManager(); + final CacheConfig cacheConfig = new CacheConfig(); + final Cache cache = cacheManager.createCache(cacheName, cacheConfig); + final ICache iCache = cache.unwrap(ICache.class); + + iCache.addPartitionLostListener(new CachePartitionLostListener() { + @Override + public void partitionLost(CachePartitionLostEvent event) { + } + }); + + assertRegistrationsSizeEventually(instance, cacheName, 1); + } + + @Test + public void test_cachePartitionLostListener_invoked() { + final String cacheName = randomName(); + HazelcastInstance instance = hazelcastFactory.newHazelcastInstance(); + final HazelcastInstance client = hazelcastFactory.newHazelcastClient(); + + final HazelcastServerCachingProvider cachingProvider = createCachingProvider(instance); + final CacheManager cacheManager = cachingProvider.getCacheManager(); + final CacheConfig config = new CacheConfig(); + config.setBackupCount(0); + cacheManager.createCache(cacheName, config); + + final CachingProvider clientCachingProvider = HazelcastClientCachingProvider.createCachingProvider(client); + final CacheManager clientCacheManager = clientCachingProvider.getCacheManager(); + final Cache cache = clientCacheManager.getCache(cacheName); + + final ICache iCache = cache.unwrap(ICache.class); + + final EventCollectingCachePartitionLostListener listener = new EventCollectingCachePartitionLostListener(); + iCache.addPartitionLostListener(listener); + + final CacheService cacheService = getNode(instance).getNodeEngine().getService(CacheService.SERVICE_NAME); + final int partitionId = 5; + cacheService.onPartitionLost(new InternalPartitionLostEvent(partitionId, 0, null)); + + assertCachePartitionLostEventEventually(listener, partitionId); + } + + @Test + public void test_cachePartitionLostListener_invoked_fromOtherNode() { + + final String cacheName = randomName(); + HazelcastInstance instance1 = hazelcastFactory.newHazelcastInstance(); + HazelcastInstance instance2 = hazelcastFactory.newHazelcastInstance(); + final HazelcastInstance client = hazelcastFactory.newHazelcastClient(); + + final HazelcastServerCachingProvider cachingProvider = createCachingProvider(instance1); + final CacheManager cacheManager = cachingProvider.getCacheManager(); + final CacheConfig config = new CacheConfig(); + config.setBackupCount(0); + cacheManager.createCache(cacheName, config); + + final CachingProvider clientCachingProvider = HazelcastClientCachingProvider.createCachingProvider(client); + final CacheManager clientCacheManager = clientCachingProvider.getCacheManager(); + final Cache cache = clientCacheManager.getCache(cacheName); + + final ICache iCache = cache.unwrap(ICache.class); + + final EventCollectingCachePartitionLostListener listener = new EventCollectingCachePartitionLostListener(); + iCache.addPartitionLostListener(listener); + + assertRegistrationsSizeEventually(instance1, cacheName, 1); + assertRegistrationsSizeEventually(instance2, cacheName, 1); + + final CacheService cacheService1 = getNode(instance1).getNodeEngine().getService(CacheService.SERVICE_NAME); + final CacheService cacheService2 = getNode(instance2).getNodeEngine().getService(CacheService.SERVICE_NAME); + final int partitionId = 5; + cacheService1.onPartitionLost(new InternalPartitionLostEvent(partitionId, 0, null)); + cacheService2.onPartitionLost(new InternalPartitionLostEvent(partitionId, 0, null)); + + assertCachePartitionLostEventEventually(listener, partitionId); + } + + @Test + public void test_cachePartitionLostListener_removed() { + final String cacheName = randomName(); + HazelcastInstance instance = hazelcastFactory.newHazelcastInstance(); + final HazelcastInstance client = hazelcastFactory.newHazelcastClient(); + + final HazelcastServerCachingProvider cachingProvider = createCachingProvider(instance); + final CacheManager cacheManager = cachingProvider.getCacheManager(); + final CacheConfig config = new CacheConfig(); + config.setBackupCount(0); + cacheManager.createCache(cacheName, config); + + final CachingProvider clientCachingProvider = HazelcastClientCachingProvider.createCachingProvider(client); + final CacheManager clientCacheManager = clientCachingProvider.getCacheManager(); + final Cache cache = clientCacheManager.getCache(cacheName); + final ICache iCache = cache.unwrap(ICache.class); + + final String registrationId = iCache.addPartitionLostListener(mock(CachePartitionLostListener.class)); + + assertRegistrationsSizeEventually(instance, cacheName, 1); + + assertTrue(iCache.removePartitionLostListener(registrationId)); + assertRegistrationsSizeEventually(instance, cacheName, 0); + } + + private void assertCachePartitionLostEventEventually(final EventCollectingCachePartitionLostListener listener, + final int partitionId) { + assertTrueEventually(new AssertTask() { + @Override + public void run() + throws Exception { + + final List events = listener.getEvents(); + assertFalse(events.isEmpty()); + assertEquals(partitionId, events.get(0).getPartitionId()); + + } + }); + } + + private void assertRegistrationsSizeEventually(final HazelcastInstance instance, final String cacheName, final int size) { + assertTrueEventually(new AssertTask() { + @Override + public void run() + throws Exception { + + final InternalEventService eventService = getNode(instance).getNodeEngine().getEventService(); + final Collection registrations = eventService.getRegistrations(SERVICE_NAME, cacheName); + assertEquals(size, registrations.size()); + + } + }); + } + + private class EventCollectingCachePartitionLostListener + implements CachePartitionLostListener { + + private final List events = Collections.synchronizedList(new LinkedList()); + + public EventCollectingCachePartitionLostListener() { + } + + @Override + public void partitionLost(CachePartitionLostEvent event) { + this.events.add(event); + } + + public List getEvents() { + synchronized (events) { + return new ArrayList(events); + } + } + } + +} diff --git a/hazelcast-client/src/main/java/com/hazelcast/client/cache/impl/ClientCacheProxy.java b/hazelcast-client/src/main/java/com/hazelcast/client/cache/impl/ClientCacheProxy.java index 127d69259dbe..15a1d4be9b09 100644 --- a/hazelcast-client/src/main/java/com/hazelcast/client/cache/impl/ClientCacheProxy.java +++ b/hazelcast-client/src/main/java/com/hazelcast/client/cache/impl/ClientCacheProxy.java @@ -18,13 +18,18 @@ import com.hazelcast.cache.impl.CacheEntryProcessorResult; import com.hazelcast.cache.impl.CacheEventListenerAdaptor; +import com.hazelcast.cache.impl.CacheEventType; import com.hazelcast.cache.impl.CacheProxyUtil; import com.hazelcast.cache.impl.client.CacheAddEntryListenerRequest; +import com.hazelcast.cache.impl.client.CacheAddPartitionLostListenerRequest; import com.hazelcast.cache.impl.client.CacheContainsKeyRequest; import com.hazelcast.cache.impl.client.CacheEntryProcessorRequest; import com.hazelcast.cache.impl.client.CacheListenerRegistrationRequest; import com.hazelcast.cache.impl.client.CacheLoadAllRequest; import com.hazelcast.cache.impl.client.CacheRemoveEntryListenerRequest; +import com.hazelcast.cache.impl.client.CacheRemovePartitionLostListenerRequest; +import com.hazelcast.cache.impl.event.CachePartitionLostEvent; +import com.hazelcast.cache.impl.event.CachePartitionLostListener; import com.hazelcast.cache.impl.nearcache.NearCache; import com.hazelcast.client.impl.HazelcastClientInstanceImpl; import com.hazelcast.client.spi.ClientContext; @@ -35,6 +40,7 @@ import com.hazelcast.core.Member; import com.hazelcast.nio.Address; import com.hazelcast.nio.serialization.Data; +import com.hazelcast.spi.impl.PortableCachePartitionLostEvent; import com.hazelcast.spi.impl.SerializableList; import com.hazelcast.util.ExceptionUtil; @@ -357,4 +363,52 @@ public Iterator> iterator() { return new ClientClusterWideIterator(this, clientContext); } + + @Override + public String addPartitionLostListener(CachePartitionLostListener listener) { + ensureOpen(); + if (listener == null) { + throw new NullPointerException("CachePartitionLostListener can't be null"); + } + final EventHandler handler = new ClientCachePartitionLostEventHandler(listener); + final CacheAddPartitionLostListenerRequest registrationRequest = + new CacheAddPartitionLostListenerRequest(name); + return clientContext.getListenerService().startListening(registrationRequest, null, handler); + } + + @Override + public boolean removePartitionLostListener(String id) { + ensureOpen(); + if (id == null) { + throw new NullPointerException("Registration id can't be null"); + } + final CacheRemovePartitionLostListenerRequest request = new CacheRemovePartitionLostListenerRequest(name, id); + return clientContext.getListenerService().stopListening(request, id); + } + + private class ClientCachePartitionLostEventHandler implements EventHandler { + + private CachePartitionLostListener listener; + + public ClientCachePartitionLostEventHandler(CachePartitionLostListener listener) { + this.listener = listener; + } + + @Override + public void handle(PortableCachePartitionLostEvent event) { + final Member member = clientContext.getClusterService().getMember(event.getUuid()); + listener.partitionLost(new CachePartitionLostEvent(name, member, CacheEventType.PARTITION_LOST.getType(), + event.getPartitionId())); + } + + @Override + public void beforeListenerRegister() { + + } + + @Override + public void onListenerRegister() { + + } + } } diff --git a/hazelcast-client/src/test/java/com/hazelcast/client/cache/ClientCachePartitionLostListenerTest.java b/hazelcast-client/src/test/java/com/hazelcast/client/cache/ClientCachePartitionLostListenerTest.java new file mode 100644 index 000000000000..f6750b4b028f --- /dev/null +++ b/hazelcast-client/src/test/java/com/hazelcast/client/cache/ClientCachePartitionLostListenerTest.java @@ -0,0 +1,212 @@ +package com.hazelcast.client.cache; + +import com.hazelcast.cache.ICache; +import com.hazelcast.cache.impl.CacheService; +import com.hazelcast.cache.impl.HazelcastServerCachingProvider; +import com.hazelcast.cache.impl.event.CachePartitionLostEvent; +import com.hazelcast.cache.impl.event.CachePartitionLostListener; +import com.hazelcast.client.cache.impl.HazelcastClientCachingProvider; +import com.hazelcast.client.test.TestHazelcastFactory; +import com.hazelcast.config.CacheConfig; +import com.hazelcast.core.HazelcastInstance; +import com.hazelcast.partition.InternalPartitionLostEvent; +import com.hazelcast.spi.EventRegistration; +import com.hazelcast.spi.impl.eventservice.InternalEventService; +import com.hazelcast.test.AssertTask; +import com.hazelcast.test.HazelcastParallelClassRunner; +import com.hazelcast.test.HazelcastTestSupport; +import com.hazelcast.test.annotation.ParallelTest; +import com.hazelcast.test.annotation.QuickTest; +import org.junit.After; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import javax.cache.Cache; +import javax.cache.CacheManager; +import javax.cache.spi.CachingProvider; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; + +import static com.hazelcast.cache.impl.HazelcastServerCachingProvider.createCachingProvider; +import static com.hazelcast.cache.impl.ICacheService.SERVICE_NAME; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; + +@RunWith(HazelcastParallelClassRunner.class) +@Category({QuickTest.class, ParallelTest.class}) +public class ClientCachePartitionLostListenerTest extends HazelcastTestSupport { + + private final TestHazelcastFactory hazelcastFactory = new TestHazelcastFactory(); + + @After + public void tearDown() { + hazelcastFactory.terminateAll(); + } + + @Test + public void test_cachePartitionLostListener_registered() { + + final String cacheName = randomName(); + + HazelcastInstance instance = hazelcastFactory.newHazelcastInstance(); + final HazelcastInstance client = hazelcastFactory.newHazelcastClient(); + + final CachingProvider cachingProvider = HazelcastClientCachingProvider.createCachingProvider(client); + final CacheManager cacheManager = cachingProvider.getCacheManager(); + final CacheConfig cacheConfig = new CacheConfig(); + final Cache cache = cacheManager.createCache(cacheName, cacheConfig); + final ICache iCache = cache.unwrap(ICache.class); + + iCache.addPartitionLostListener(new CachePartitionLostListener() { + @Override + public void partitionLost(CachePartitionLostEvent event) { + } + }); + + assertRegistrationsSizeEventually(instance, cacheName, 1); + } + + @Test + public void test_cachePartitionLostListener_invoked() { + final String cacheName = randomName(); + HazelcastInstance instance = hazelcastFactory.newHazelcastInstance(); + final HazelcastInstance client = hazelcastFactory.newHazelcastClient(); + + final HazelcastServerCachingProvider cachingProvider = createCachingProvider(instance); + final CacheManager cacheManager = cachingProvider.getCacheManager(); + final CacheConfig config = new CacheConfig(); + config.setBackupCount(0); + cacheManager.createCache(cacheName, config); + + final CachingProvider clientCachingProvider = HazelcastClientCachingProvider.createCachingProvider(client); + final CacheManager clientCacheManager = clientCachingProvider.getCacheManager(); + final Cache cache = clientCacheManager.getCache(cacheName); + + final ICache iCache = cache.unwrap(ICache.class); + + final EventCollectingCachePartitionLostListener listener = new EventCollectingCachePartitionLostListener(); + iCache.addPartitionLostListener(listener); + + final CacheService cacheService = getNode(instance).getNodeEngine().getService(CacheService.SERVICE_NAME); + final int partitionId = 5; + cacheService.onPartitionLost(new InternalPartitionLostEvent(partitionId, 0, null)); + + assertCachePartitionLostEventEventually(listener, partitionId); + } + + @Test + public void test_cachePartitionLostListener_invoked_fromOtherNode() { + + final String cacheName = randomName(); + HazelcastInstance instance1 = hazelcastFactory.newHazelcastInstance(); + HazelcastInstance instance2 = hazelcastFactory.newHazelcastInstance(); + final HazelcastInstance client = hazelcastFactory.newHazelcastClient(); + + final HazelcastServerCachingProvider cachingProvider = createCachingProvider(instance1); + final CacheManager cacheManager = cachingProvider.getCacheManager(); + final CacheConfig config = new CacheConfig(); + config.setBackupCount(0); + cacheManager.createCache(cacheName, config); + + final CachingProvider clientCachingProvider = HazelcastClientCachingProvider.createCachingProvider(client); + final CacheManager clientCacheManager = clientCachingProvider.getCacheManager(); + final Cache cache = clientCacheManager.getCache(cacheName); + + final ICache iCache = cache.unwrap(ICache.class); + + final EventCollectingCachePartitionLostListener listener = new EventCollectingCachePartitionLostListener(); + iCache.addPartitionLostListener(listener); + + assertRegistrationsSizeEventually(instance1, cacheName, 1); + assertRegistrationsSizeEventually(instance2, cacheName, 1); + + final CacheService cacheService1 = getNode(instance1).getNodeEngine().getService(CacheService.SERVICE_NAME); + final CacheService cacheService2 = getNode(instance2).getNodeEngine().getService(CacheService.SERVICE_NAME); + final int partitionId = 5; + cacheService1.onPartitionLost(new InternalPartitionLostEvent(partitionId, 0, null)); + cacheService2.onPartitionLost(new InternalPartitionLostEvent(partitionId, 0, null)); + + assertCachePartitionLostEventEventually(listener, partitionId); + } + + @Test + public void test_cachePartitionLostListener_removed() { + final String cacheName = randomName(); + HazelcastInstance instance = hazelcastFactory.newHazelcastInstance(); + final HazelcastInstance client = hazelcastFactory.newHazelcastClient(); + + final HazelcastServerCachingProvider cachingProvider = createCachingProvider(instance); + final CacheManager cacheManager = cachingProvider.getCacheManager(); + final CacheConfig config = new CacheConfig(); + config.setBackupCount(0); + cacheManager.createCache(cacheName, config); + + final CachingProvider clientCachingProvider = HazelcastClientCachingProvider.createCachingProvider(client); + final CacheManager clientCacheManager = clientCachingProvider.getCacheManager(); + final Cache cache = clientCacheManager.getCache(cacheName); + final ICache iCache = cache.unwrap(ICache.class); + + final String registrationId = iCache.addPartitionLostListener(mock(CachePartitionLostListener.class)); + + assertRegistrationsSizeEventually(instance, cacheName, 1); + + assertTrue(iCache.removePartitionLostListener(registrationId)); + assertRegistrationsSizeEventually(instance, cacheName, 0); + } + + private void assertCachePartitionLostEventEventually(final EventCollectingCachePartitionLostListener listener, + final int partitionId) { + assertTrueEventually(new AssertTask() { + @Override + public void run() + throws Exception { + + final List events = listener.getEvents(); + assertFalse(events.isEmpty()); + assertEquals(partitionId, events.get(0).getPartitionId()); + + } + }); + } + + private void assertRegistrationsSizeEventually(final HazelcastInstance instance, final String cacheName, final int size) { + assertTrueEventually(new AssertTask() { + @Override + public void run() + throws Exception { + + final InternalEventService eventService = getNode(instance).getNodeEngine().getEventService(); + final Collection registrations = eventService.getRegistrations(SERVICE_NAME, cacheName); + assertEquals(size, registrations.size()); + + } + }); + } + + private class EventCollectingCachePartitionLostListener + implements CachePartitionLostListener { + + private final List events = Collections.synchronizedList(new LinkedList()); + + public EventCollectingCachePartitionLostListener() { + } + + @Override + public void partitionLost(CachePartitionLostEvent event) { + this.events.add(event); + } + + public List getEvents() { + synchronized (events) { + return new ArrayList(events); + } + } + } + +} diff --git a/hazelcast-spring/src/main/java/com/hazelcast/spring/HazelcastConfigBeanDefinitionParser.java b/hazelcast-spring/src/main/java/com/hazelcast/spring/HazelcastConfigBeanDefinitionParser.java index a620b0248744..05244125bd88 100644 --- a/hazelcast-spring/src/main/java/com/hazelcast/spring/HazelcastConfigBeanDefinitionParser.java +++ b/hazelcast-spring/src/main/java/com/hazelcast/spring/HazelcastConfigBeanDefinitionParser.java @@ -17,7 +17,12 @@ package com.hazelcast.spring; import com.hazelcast.config.AwsConfig; +import com.hazelcast.config.CachePartitionLostListenerConfig; import com.hazelcast.config.CacheSimpleConfig; +import com.hazelcast.config.CacheSimpleConfig.ExpiryPolicyFactoryConfig; +import com.hazelcast.config.CacheSimpleConfig.ExpiryPolicyFactoryConfig.DurationConfig; +import com.hazelcast.config.CacheSimpleConfig.ExpiryPolicyFactoryConfig.TimedExpiryPolicyFactoryConfig; +import com.hazelcast.config.CacheSimpleConfig.ExpiryPolicyFactoryConfig.TimedExpiryPolicyFactoryConfig.ExpiryPolicyType; import com.hazelcast.config.CacheSimpleEntryListenerConfig; import com.hazelcast.config.Config; import com.hazelcast.config.CredentialsFactoryConfig; @@ -74,11 +79,6 @@ import com.hazelcast.quorum.QuorumType; import com.hazelcast.spi.ServiceConfigurationParser; import com.hazelcast.util.ExceptionUtil; -import com.hazelcast.config.CacheSimpleConfig.ExpiryPolicyFactoryConfig; -import com.hazelcast.config.CacheSimpleConfig.ExpiryPolicyFactoryConfig.TimedExpiryPolicyFactoryConfig; -import com.hazelcast.config.CacheSimpleConfig.ExpiryPolicyFactoryConfig.DurationConfig; -import com.hazelcast.config.CacheSimpleConfig.ExpiryPolicyFactoryConfig.TimedExpiryPolicyFactoryConfig.ExpiryPolicyType; - import com.hazelcast.util.StringUtil; import org.springframework.beans.factory.support.AbstractBeanDefinition; import org.springframework.beans.factory.support.BeanDefinitionBuilder; @@ -707,8 +707,14 @@ public void handleCache(Node node) { .getBeanDefinition(); fillValues(childNode, wanReplicationRefBuilder); cacheConfigBuilder.addPropertyValue("wanReplicationRef", wanReplicationRefBeanDefinition); + } else if ("partition-lost-listeners".equals(cleanNodeName(childNode))) { + ManagedList listeners = parseListeners(childNode, CachePartitionLostListenerConfig.class); + cacheConfigBuilder.addPropertyValue("partitionLostListenerConfigs", listeners); } else if ("quorum-ref".equals(cleanNodeName(childNode))) { cacheConfigBuilder.addPropertyValue("quorumName", getTextContent(childNode)); + } else if ("partition-lost-listeners".equals(cleanNodeName(childNode))) { + ManagedList listeners = parseListeners(childNode, CachePartitionLostListenerConfig.class); + cacheConfigBuilder.addPropertyValue("partitionLostListenerConfigs", listeners); } } cacheConfigManagedMap.put(name, cacheConfigBuilder.getBeanDefinition()); diff --git a/hazelcast-spring/src/main/resources/hazelcast-spring-3.6.xsd b/hazelcast-spring/src/main/resources/hazelcast-spring-3.6.xsd index 9b6e8aaa9893..1a72e89b36db 100644 --- a/hazelcast-spring/src/main/resources/hazelcast-spring-3.6.xsd +++ b/hazelcast-spring/src/main/resources/hazelcast-spring-3.6.xsd @@ -434,6 +434,18 @@ + + + List of partition lost listeners + + + + + + + diff --git a/hazelcast-spring/src/test/java/com/hazelcast/spring/DummyCachePartitionLostListenerImpl.java b/hazelcast-spring/src/test/java/com/hazelcast/spring/DummyCachePartitionLostListenerImpl.java new file mode 100644 index 000000000000..296556b7231f --- /dev/null +++ b/hazelcast-spring/src/test/java/com/hazelcast/spring/DummyCachePartitionLostListenerImpl.java @@ -0,0 +1,12 @@ +package com.hazelcast.spring; + +import com.hazelcast.cache.impl.event.CachePartitionLostEvent; +import com.hazelcast.cache.impl.event.CachePartitionLostListener; + + +public class DummyCachePartitionLostListenerImpl implements CachePartitionLostListener { + @Override + public void partitionLost(CachePartitionLostEvent event) { + + } +} diff --git a/hazelcast-spring/src/test/java/com/hazelcast/spring/context/TestJCache.java b/hazelcast-spring/src/test/java/com/hazelcast/spring/context/TestJCache.java index 5acdadf3b9b6..5fa021852303 100644 --- a/hazelcast-spring/src/test/java/com/hazelcast/spring/context/TestJCache.java +++ b/hazelcast-spring/src/test/java/com/hazelcast/spring/context/TestJCache.java @@ -16,33 +16,32 @@ package com.hazelcast.spring.context; +import com.hazelcast.config.CachePartitionLostListenerConfig; +import com.hazelcast.config.CacheSimpleConfig; +import com.hazelcast.config.CacheSimpleConfig.ExpiryPolicyFactoryConfig; +import com.hazelcast.config.CacheSimpleConfig.ExpiryPolicyFactoryConfig.DurationConfig; +import com.hazelcast.config.CacheSimpleConfig.ExpiryPolicyFactoryConfig.TimedExpiryPolicyFactoryConfig; +import com.hazelcast.config.CacheSimpleConfig.ExpiryPolicyFactoryConfig.TimedExpiryPolicyFactoryConfig.ExpiryPolicyType; import com.hazelcast.config.Config; import com.hazelcast.config.EvictionConfig; -import com.hazelcast.config.CacheSimpleConfig; import com.hazelcast.config.EvictionPolicy; import com.hazelcast.config.InMemoryFormat; import com.hazelcast.core.Hazelcast; import com.hazelcast.core.HazelcastInstance; import com.hazelcast.spring.CustomSpringJUnit4ClassRunner; import com.hazelcast.test.annotation.QuickTest; -import com.hazelcast.config.CacheSimpleConfig.ExpiryPolicyFactoryConfig; -import com.hazelcast.config.CacheSimpleConfig.ExpiryPolicyFactoryConfig.TimedExpiryPolicyFactoryConfig; -import com.hazelcast.config.CacheSimpleConfig.ExpiryPolicyFactoryConfig.DurationConfig; -import com.hazelcast.config.CacheSimpleConfig.ExpiryPolicyFactoryConfig.TimedExpiryPolicyFactoryConfig.ExpiryPolicyType; - import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; - import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationContext; import org.springframework.test.context.ContextConfiguration; import javax.annotation.Resource; - import java.io.IOException; +import java.util.List; import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertEquals; @@ -210,6 +209,23 @@ public void cacheConfigXmlTest_TimedEternalTouchedPolicyFactory() throws IOExcep assertEquals(ExpiryPolicyType.ETERNAL, timedExpiryPolicyFactoryConfig.getExpiryPolicyType()); } + + @Test + public void cacheConfigXmlTest_PartitionLostListener() throws IOException { + Config config = instance1.getConfig(); + + CacheSimpleConfig cacheWithPartitionLostListenerConfig = + config.getCacheConfig("cacheWithPartitionLostListener"); + List partitionLostListenerConfigs = + cacheWithPartitionLostListenerConfig.getPartitionLostListenerConfigs(); + + assertNotNull(partitionLostListenerConfigs); + assertEquals(1, partitionLostListenerConfigs.size()); + assertEquals(partitionLostListenerConfigs.get(0).getClassName(),"DummyCachePartitionLostListenerImpl"); + assertNotNull(partitionLostListenerConfigs); + assertEquals(1, partitionLostListenerConfigs.size()); + assertEquals(partitionLostListenerConfigs.get(0).getClassName(),"DummyCachePartitionLostListenerImpl"); + } @Test public void testCacheQuorumConfig() { diff --git a/hazelcast-spring/src/test/resources/com/hazelcast/spring/context/test-jcache-application-context.xml b/hazelcast-spring/src/test/resources/com/hazelcast/spring/context/test-jcache-application-context.xml index c2307a29482b..4086ef5793ae 100644 --- a/hazelcast-spring/src/test/resources/com/hazelcast/spring/context/test-jcache-application-context.xml +++ b/hazelcast-spring/src/test/resources/com/hazelcast/spring/context/test-jcache-application-context.xml @@ -102,6 +102,11 @@ + + + + + cacheQuorumRefString diff --git a/hazelcast/src/main/java/com/hazelcast/cache/ICache.java b/hazelcast/src/main/java/com/hazelcast/cache/ICache.java index f85f29803d65..fdcfc62e9eda 100644 --- a/hazelcast/src/main/java/com/hazelcast/cache/ICache.java +++ b/hazelcast/src/main/java/com/hazelcast/cache/ICache.java @@ -16,6 +16,7 @@ package com.hazelcast.cache; +import com.hazelcast.cache.impl.event.CachePartitionLostListener; import com.hazelcast.core.ICompletableFuture; import javax.cache.expiry.ExpiryPolicy; @@ -944,4 +945,32 @@ public interface ICache */ CacheStatistics getLocalCacheStatistics(); + /** + * Adds a CachePartitionLostListener. + *

+ * The addPartitionLostListener returns a register-id. This id is needed to remove the CachePartitionLostListener using the + * {@link #removePartitionLostListener(String)} method. + *

+ * There is no check for duplicate registrations, so if you register the listener twice, it will get events twice. + * IMPORTANT: Please @see com.hazelcast.partition.PartitionLostListener for weaknesses. + * IMPORTANT: Listeners registered from HazelcastClient may miss some of the cache partition lost events due + * to design limitations. + * + * @param listener the added CachePartitionLostListener. + * @return returns the registration id for the CachePartitionLostListener. + * @throws java.lang.NullPointerException if listener is null. + * @see #removePartitionLostListener(String) + */ + String addPartitionLostListener(CachePartitionLostListener listener); + + /** + * Removes the specified cache partition lost listener. + * Returns silently if there is no such listener added before. + * + * @param id id of registered listener. + * @return true if registration is removed, false otherwise. + * @throws java.lang.NullPointerException if the given id is null. + */ + boolean removePartitionLostListener(String id); + } diff --git a/hazelcast/src/main/java/com/hazelcast/cache/impl/AbstractCacheService.java b/hazelcast/src/main/java/com/hazelcast/cache/impl/AbstractCacheService.java index bf625322b244..1b9ed8d6f37c 100644 --- a/hazelcast/src/main/java/com/hazelcast/cache/impl/AbstractCacheService.java +++ b/hazelcast/src/main/java/com/hazelcast/cache/impl/AbstractCacheService.java @@ -16,6 +16,7 @@ package com.hazelcast.cache.impl; +import com.hazelcast.cache.impl.event.CachePartitionLostEventFilter; import com.hazelcast.cache.impl.operation.CacheDestroyOperation; import com.hazelcast.cache.impl.operation.PostJoinCacheOperation; import com.hazelcast.config.CacheConfig; @@ -25,6 +26,7 @@ import com.hazelcast.core.Member; import com.hazelcast.nio.IOUtil; import com.hazelcast.nio.serialization.Data; +import com.hazelcast.partition.InternalPartitionLostEvent; import com.hazelcast.partition.MigrationEndpoint; import com.hazelcast.spi.EventFilter; import com.hazelcast.spi.EventRegistration; @@ -32,6 +34,7 @@ import com.hazelcast.spi.NodeEngine; import com.hazelcast.spi.Operation; import com.hazelcast.spi.OperationService; +import com.hazelcast.spi.PartitionAwareService; import com.hazelcast.spi.PartitionMigrationEvent; import com.hazelcast.spi.PostJoinAwareService; import com.hazelcast.spi.QuorumAwareService; @@ -44,6 +47,7 @@ import java.io.Closeable; import java.util.Collection; import java.util.Collections; +import java.util.LinkedList; import java.util.Map; import java.util.Properties; import java.util.Set; @@ -51,7 +55,7 @@ import java.util.concurrent.ConcurrentMap; public abstract class AbstractCacheService - implements ICacheService, PostJoinAwareService, QuorumAwareService { + implements ICacheService, PostJoinAwareService, PartitionAwareService, QuorumAwareService { protected final ConcurrentMap configs = new ConcurrentHashMap(); protected final ConcurrentMap cacheContexts = new ConcurrentHashMap(); @@ -472,6 +476,42 @@ public Operation getPostJoinOperation() { return postJoinCacheOperation; } + + protected void publishCachePartitionLostEvent(String cacheName, int partitionId) { + final Collection registrations = new LinkedList(); + for (EventRegistration registration : getRegistrations(cacheName)) { + if (registration.getFilter() instanceof CachePartitionLostEventFilter) { + registrations.add(registration); + } + } + + if (registrations.isEmpty()) { + return; + } + final Member member = nodeEngine.getLocalMember(); + final CacheEventData eventData = new CachePartitionEventData(cacheName, partitionId, member); + final EventService eventService = nodeEngine.getEventService(); + + eventService.publishEvent(SERVICE_NAME, registrations, eventData, partitionId); + + } + + Collection getRegistrations(String cacheName) { + final EventService eventService = nodeEngine.getEventService(); + return eventService.getRegistrations(SERVICE_NAME, cacheName); + } + + @Override + public void onPartitionLost(InternalPartitionLostEvent partitionLostEvent) { + final int partitionId = partitionLostEvent.getPartitionId(); + for (CacheConfig config : getCacheConfigs()) { + final String cacheName = config.getName(); + if (config.getBackupCount() <= partitionLostEvent.getLostReplicaIndex()) { + publishCachePartitionLostEvent(cacheName, partitionId); + } + } + } + public void cacheEntryListenerRegistered(String name, CacheEntryListenerConfiguration cacheEntryListenerConfiguration) { CacheConfig cacheConfig = getCacheConfig(name); diff --git a/hazelcast/src/main/java/com/hazelcast/cache/impl/AbstractInternalCacheProxy.java b/hazelcast/src/main/java/com/hazelcast/cache/impl/AbstractInternalCacheProxy.java index a112d4c960fb..a4f5dd2d706e 100644 --- a/hazelcast/src/main/java/com/hazelcast/cache/impl/AbstractInternalCacheProxy.java +++ b/hazelcast/src/main/java/com/hazelcast/cache/impl/AbstractInternalCacheProxy.java @@ -16,10 +16,17 @@ package com.hazelcast.cache.impl; +import com.hazelcast.cache.impl.event.CachePartitionLostEventFilter; +import com.hazelcast.cache.impl.event.CachePartitionLostListener; +import com.hazelcast.cache.impl.event.InternalCachePartitionLostListenerAdapter; import com.hazelcast.cache.impl.operation.MutableOperation; import com.hazelcast.config.CacheConfig; +import com.hazelcast.config.CachePartitionLostListenerConfig; +import com.hazelcast.config.ListenerConfig; import com.hazelcast.core.HazelcastInstanceNotActiveException; +import com.hazelcast.nio.ClassLoaderUtil; import com.hazelcast.nio.serialization.Data; +import com.hazelcast.spi.EventFilter; import com.hazelcast.spi.InternalCompletableFuture; import com.hazelcast.spi.NodeEngine; import com.hazelcast.spi.Operation; @@ -31,8 +38,10 @@ import javax.cache.configuration.CacheEntryListenerConfiguration; import javax.cache.expiry.ExpiryPolicy; import java.util.Collection; +import java.util.EventListener; import java.util.HashSet; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -76,6 +85,19 @@ protected AbstractInternalCacheProxy(CacheConfig cacheConfig, NodeEngine nodeEng asyncListenerRegistrations = new ConcurrentHashMap(); syncListenerRegistrations = new ConcurrentHashMap(); syncLocks = new ConcurrentHashMap(); + + final List configs = cacheConfig.getPartitionLostListenerConfigs(); + for (CachePartitionLostListenerConfig listenerConfig : configs) { + final CachePartitionLostListener listener = initializeListener(listenerConfig); + if (listener != null) { + final InternalCachePartitionLostListenerAdapter listenerAdapter = + new InternalCachePartitionLostListenerAdapter(listener); + final EventFilter filter = new CachePartitionLostEventFilter(); + final ICacheService service = getService(); + service.getNodeEngine().getEventService().registerListener(AbstractCacheService.SERVICE_NAME, + name, filter, listenerAdapter); + } + } } protected InternalCompletableFuture invoke(Operation op, Data keyData, boolean completionOperation) { @@ -357,4 +379,20 @@ private void awaitLatch(CountDownLatch countDownLatch) { } } //endregion Listener operations + + private T initializeListener(ListenerConfig listenerConfig) { + T listener = null; + if (listenerConfig.getImplementation() != null) { + listener = (T) listenerConfig.getImplementation(); + } else if (listenerConfig.getClassName() != null) { + try { + return ClassLoaderUtil + .newInstance(getNodeEngine().getConfigClassLoader(), listenerConfig.getClassName()); + } catch (Exception e) { + throw ExceptionUtil.rethrow(e); + } + } + return listener; + } + } diff --git a/hazelcast/src/main/java/com/hazelcast/cache/impl/CacheEventType.java b/hazelcast/src/main/java/com/hazelcast/cache/impl/CacheEventType.java index 326d96bebb3d..fb2c9cf08112 100644 --- a/hazelcast/src/main/java/com/hazelcast/cache/impl/CacheEventType.java +++ b/hazelcast/src/main/java/com/hazelcast/cache/impl/CacheEventType.java @@ -61,9 +61,14 @@ public enum CacheEventType { COMPLETED(7), /** - * An event type indicationg that the expiration time of cache record has been updated + * An event type indicating that the expiration time of cache record has been updated */ - EXPIRATION_TIME_UPDATED(8); + EXPIRATION_TIME_UPDATED(8), + + /** + * An event type indicating that partition lost is detected in given cache with name + */ + PARTITION_LOST(9); private int type; diff --git a/hazelcast/src/main/java/com/hazelcast/cache/impl/CachePartitionEventData.java b/hazelcast/src/main/java/com/hazelcast/cache/impl/CachePartitionEventData.java new file mode 100644 index 000000000000..5458646b8ca5 --- /dev/null +++ b/hazelcast/src/main/java/com/hazelcast/cache/impl/CachePartitionEventData.java @@ -0,0 +1,68 @@ +/* + * Copyright (c) 2008-2015, Hazelcast, Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hazelcast.cache.impl; + +import com.hazelcast.core.Member; +import com.hazelcast.nio.ObjectDataInput; +import com.hazelcast.nio.ObjectDataOutput; + +import java.io.IOException; + +public class CachePartitionEventData extends CacheEventDataImpl implements CacheEventData { + + private int partitionId; + private Member member; + + public CachePartitionEventData(String name, int partitionId, Member member) { + super(name, CacheEventType.PARTITION_LOST, null, null, null, false); + this.partitionId = partitionId; + this.member = member; + } + + public Member getMember() { + return member; + } + + public int getPartitionId() { + return partitionId; + } + + @Override + public void writeData(ObjectDataOutput out) + throws IOException { + super.writeData(out); + out.writeInt(partitionId); + out.writeObject(member); + } + + @Override + public void readData(ObjectDataInput in) + throws IOException { + super.readData(in); + partitionId = in.readInt(); + member = in.readObject(); + } + + @Override + public String toString() { + return "CachePartitionEventData{" + + super.toString() + + ", partitionId=" + partitionId + + '}'; + } +} + diff --git a/hazelcast/src/main/java/com/hazelcast/cache/impl/CachePortableHook.java b/hazelcast/src/main/java/com/hazelcast/cache/impl/CachePortableHook.java index ddaf7a43d665..b035221ab769 100644 --- a/hazelcast/src/main/java/com/hazelcast/cache/impl/CachePortableHook.java +++ b/hazelcast/src/main/java/com/hazelcast/cache/impl/CachePortableHook.java @@ -18,6 +18,7 @@ import com.hazelcast.cache.impl.client.CacheAddEntryListenerRequest; import com.hazelcast.cache.impl.client.CacheAddInvalidationListenerRequest; +import com.hazelcast.cache.impl.client.CacheAddPartitionLostListenerRequest; import com.hazelcast.cache.impl.client.CacheBatchInvalidationMessage; import com.hazelcast.cache.impl.client.CacheClearRequest; import com.hazelcast.cache.impl.client.CacheContainsKeyRequest; @@ -37,6 +38,7 @@ import com.hazelcast.cache.impl.client.CachePutRequest; import com.hazelcast.cache.impl.client.CacheRemoveEntryListenerRequest; import com.hazelcast.cache.impl.client.CacheRemoveInvalidationListenerRequest; +import com.hazelcast.cache.impl.client.CacheRemovePartitionLostListenerRequest; import com.hazelcast.cache.impl.client.CacheRemoveRequest; import com.hazelcast.cache.impl.client.CacheReplaceRequest; import com.hazelcast.cache.impl.client.CacheSingleInvalidationMessage; @@ -88,8 +90,10 @@ public class CachePortableHook public static final int REMOVE_ENTRY_LISTENER = 23; public static final int LISTENER_REGISTRATION = 24; public static final int DESTROY_CACHE = 25; + public static final int ADD_CACHE_PARTITION_LOST_LISTENER = 26; + public static final int REMOVE_CACHE_PARTITION_LOST_LISTENER = 27; - public static final int LEN = 26; + public static final int LEN = 28; public int getFactoryId() { return F_ID; @@ -225,6 +229,16 @@ public Portable createNew(Integer arg) { return new CacheDestroyRequest(); } }; + constructors[ADD_CACHE_PARTITION_LOST_LISTENER] = new ConstructorFunction() { + public Portable createNew(Integer arg) { + return new CacheAddPartitionLostListenerRequest(); + } + }; + constructors[REMOVE_CACHE_PARTITION_LOST_LISTENER] = new ConstructorFunction() { + public Portable createNew(Integer arg) { + return new CacheRemovePartitionLostListenerRequest(); + } + }; } public Portable create(int classId) { diff --git a/hazelcast/src/main/java/com/hazelcast/cache/impl/CacheProxy.java b/hazelcast/src/main/java/com/hazelcast/cache/impl/CacheProxy.java index dd5f1056a073..00f356dc981d 100644 --- a/hazelcast/src/main/java/com/hazelcast/cache/impl/CacheProxy.java +++ b/hazelcast/src/main/java/com/hazelcast/cache/impl/CacheProxy.java @@ -16,11 +16,16 @@ package com.hazelcast.cache.impl; +import com.hazelcast.cache.impl.event.CachePartitionLostEventFilter; +import com.hazelcast.cache.impl.event.CachePartitionLostListener; +import com.hazelcast.cache.impl.event.InternalCachePartitionLostListenerAdapter; import com.hazelcast.cache.impl.operation.CacheListenerRegistrationOperation; import com.hazelcast.config.CacheConfig; import com.hazelcast.core.Member; import com.hazelcast.logging.ILogger; import com.hazelcast.nio.serialization.Data; +import com.hazelcast.spi.EventFilter; +import com.hazelcast.spi.EventRegistration; import com.hazelcast.spi.InternalCompletableFuture; import com.hazelcast.spi.NodeEngine; import com.hazelcast.spi.Operation; @@ -349,4 +354,26 @@ public Iterator> iterator() { ensureOpen(); return new ClusterWideIterator(this); } + + @Override + public String addPartitionLostListener(CachePartitionLostListener listener) { + checkNotNull(listener, "CachePartitionLostListener can't be null"); + final InternalCachePartitionLostListenerAdapter listenerAdapter = + new InternalCachePartitionLostListenerAdapter(listener); + + final EventFilter filter = new CachePartitionLostEventFilter(); + final ICacheService service = getService(); + final EventRegistration registration = service.getNodeEngine().getEventService(). + registerListener(AbstractCacheService.SERVICE_NAME, name, filter, listenerAdapter); + return registration.getId(); + } + + @Override + public boolean removePartitionLostListener(String id) { + checkNotNull(id, "Listener id should not be null!"); + final ICacheService service = getService(); + return service.getNodeEngine().getEventService(). + deregisterListener(AbstractCacheService.SERVICE_NAME, + name, id); + } } diff --git a/hazelcast/src/main/java/com/hazelcast/cache/impl/client/CacheAddPartitionLostListenerRequest.java b/hazelcast/src/main/java/com/hazelcast/cache/impl/client/CacheAddPartitionLostListenerRequest.java new file mode 100644 index 000000000000..9fb0cbf1a57e --- /dev/null +++ b/hazelcast/src/main/java/com/hazelcast/cache/impl/client/CacheAddPartitionLostListenerRequest.java @@ -0,0 +1,119 @@ +/* + * Copyright (c) 2008-2015, Hazelcast, Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hazelcast.cache.impl.client; + +import com.hazelcast.cache.impl.AbstractCacheService; +import com.hazelcast.cache.impl.CachePortableHook; +import com.hazelcast.cache.impl.CacheService; +import com.hazelcast.cache.impl.ICacheService; +import com.hazelcast.cache.impl.event.CachePartitionLostEvent; +import com.hazelcast.cache.impl.event.CachePartitionLostEventFilter; +import com.hazelcast.cache.impl.event.CachePartitionLostListener; +import com.hazelcast.cache.impl.event.InternalCachePartitionLostListenerAdapter; +import com.hazelcast.client.ClientEndpoint; +import com.hazelcast.client.impl.client.CallableClientRequest; +import com.hazelcast.client.impl.client.RetryableRequest; +import com.hazelcast.nio.serialization.PortableReader; +import com.hazelcast.nio.serialization.PortableWriter; +import com.hazelcast.spi.EventFilter; +import com.hazelcast.spi.EventRegistration; +import com.hazelcast.spi.impl.PortableCachePartitionLostEvent; + +import java.io.IOException; +import java.security.Permission; + +public class CacheAddPartitionLostListenerRequest extends CallableClientRequest + implements RetryableRequest { + + + private String name; + + public CacheAddPartitionLostListenerRequest() { + } + + public CacheAddPartitionLostListenerRequest(String name) { + this.name = name; + } + + @Override + public Object call() { + final ClientEndpoint endpoint = getEndpoint(); + + final CachePartitionLostListener listener = new CachePartitionLostListener() { + @Override + public void partitionLost(CachePartitionLostEvent event) { + if (endpoint.isAlive()) { + final PortableCachePartitionLostEvent portableEvent = + new PortableCachePartitionLostEvent(event.getPartitionId(), event.getMember().getUuid()); + endpoint.sendEvent(null, portableEvent, getCallId()); + } + } + }; + + final InternalCachePartitionLostListenerAdapter listenerAdapter = + new InternalCachePartitionLostListenerAdapter(listener); + final EventFilter filter = new CachePartitionLostEventFilter(); + final ICacheService service = getService(); + final EventRegistration registration = service.getNodeEngine(). + getEventService().registerListener(AbstractCacheService.SERVICE_NAME, + name, filter, listenerAdapter); + final String registrationId = registration.getId(); + endpoint.setListenerRegistration(CacheService.SERVICE_NAME, name, registrationId); + + return registrationId; + } + + + @Override + public void write(PortableWriter writer) throws IOException { + writer.writeUTF("name", name); + } + + @Override + public void read(PortableReader reader) throws IOException { + name = reader.readUTF("name"); + } + + @Override + public String getServiceName() { + return ICacheService.SERVICE_NAME; + } + @Override + public String getMethodName() { + return "addCachePartitionLostListener"; + } + + @Override + public int getFactoryId() { + return CachePortableHook.F_ID; + } + + @Override + public int getClassId() { + return CachePortableHook.ADD_CACHE_PARTITION_LOST_LISTENER; + } + + @Override + public Permission getRequiredPermission() { + return null; + } + + @Override + public String getDistributedObjectName() { + return name; + } +} diff --git a/hazelcast/src/main/java/com/hazelcast/cache/impl/client/CacheRemovePartitionLostListenerRequest.java b/hazelcast/src/main/java/com/hazelcast/cache/impl/client/CacheRemovePartitionLostListenerRequest.java new file mode 100644 index 000000000000..69c847bf735e --- /dev/null +++ b/hazelcast/src/main/java/com/hazelcast/cache/impl/client/CacheRemovePartitionLostListenerRequest.java @@ -0,0 +1,65 @@ +/* + * Copyright (c) 2008-2015, Hazelcast, Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hazelcast.cache.impl.client; + +import com.hazelcast.cache.impl.AbstractCacheService; +import com.hazelcast.cache.impl.CachePortableHook; +import com.hazelcast.cache.impl.CacheService; +import com.hazelcast.cache.impl.ICacheService; +import com.hazelcast.client.impl.client.BaseClientRemoveListenerRequest; + +import java.security.Permission; + +public class CacheRemovePartitionLostListenerRequest extends BaseClientRemoveListenerRequest { + + + public CacheRemovePartitionLostListenerRequest() { + } + + public CacheRemovePartitionLostListenerRequest(String name, String registrationId) { + super(name, registrationId); + } + + public Object call() throws Exception { + final ICacheService service = getService(); + return service.getNodeEngine().getEventService(). + deregisterListener(AbstractCacheService.SERVICE_NAME, + name, registrationId); + } + + public String getServiceName() { + return CacheService.SERVICE_NAME; + } + + public int getFactoryId() { + return CachePortableHook.F_ID; + } + + public int getClassId() { + return CachePortableHook.REMOVE_CACHE_PARTITION_LOST_LISTENER; + } + + @Override + public Permission getRequiredPermission() { + return null; + } + + @Override + public String getMethodName() { + return "removeCachePartitionLostListener"; + } +} diff --git a/hazelcast/src/main/java/com/hazelcast/cache/impl/event/AbstractICacheEvent.java b/hazelcast/src/main/java/com/hazelcast/cache/impl/event/AbstractICacheEvent.java new file mode 100644 index 000000000000..744639ba38ef --- /dev/null +++ b/hazelcast/src/main/java/com/hazelcast/cache/impl/event/AbstractICacheEvent.java @@ -0,0 +1,103 @@ +/* + * Copyright (c) 2008-2015, Hazelcast, Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hazelcast.cache.impl.event; + +import com.hazelcast.cache.impl.CacheEventType; +import com.hazelcast.core.EntryEventType; +import com.hazelcast.core.Member; + +import java.util.EventObject; + +/** + * The abstract class for a JCache event {@link com.hazelcast.cache.impl.event.ICacheEvent}. + * @since 3.6 + */ +public abstract class AbstractICacheEvent extends EventObject implements ICacheEvent { + + protected final String name; + + private final CacheEventType cacheEventType; + + private final Member member; + + /** + * Constructs a prototypical Cache Event. + * + * @param source The object on which the Event initially occurred. + * @param member The interface to the cluster member (node). + * @param eventType The event type as an enum {@link EntryEventType} integer. + * @throws IllegalArgumentException if source is null. + */ + public AbstractICacheEvent(Object source, Member member, int eventType) { + super(source); + this.name = (String) source; + this.member = member; + this.cacheEventType = CacheEventType.getByType(eventType); + } + + + /** + * Returns the object on which the event initially occurred. + * + * @return The object on which the event initially occurred. + */ + @Override + public Object getSource() { + return name; + } + + /** + * Returns the member that fired this event. + * + * @return The member that fired this event. + */ + @Override + public Member getMember() { + return member; + } + + /** + * Returns the event type {@link EntryEventType}. + * + * @return The event type {@link EntryEventType}. + */ + @Override + public CacheEventType getEventType() { + return cacheEventType; + } + + /** + * Returns the name of the cache for this event. + * + * @return The name of the cache for this event. + */ + @Override + public String getName() { + return name; + } + + /** + * Returns a String representation of this event. + * + * @return A String representation of this event. + */ + @Override + public String toString() { + return String.format("entryEventType=%s, member=%s, name='%s'", + cacheEventType, member, name); + } +} diff --git a/hazelcast/src/main/java/com/hazelcast/cache/impl/event/CachePartitionLostEvent.java b/hazelcast/src/main/java/com/hazelcast/cache/impl/event/CachePartitionLostEvent.java new file mode 100644 index 000000000000..428879dbd352 --- /dev/null +++ b/hazelcast/src/main/java/com/hazelcast/cache/impl/event/CachePartitionLostEvent.java @@ -0,0 +1,54 @@ +/* + * Copyright (c) 2008-2015, Hazelcast, Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hazelcast.cache.impl.event; + +import com.hazelcast.core.Member; + +/** + * Used for providing information about the lost partition for a cache + * + * @see CachePartitionLostEvent + * @since 3.6 + */ +public class CachePartitionLostEvent extends AbstractICacheEvent { + + private static final long serialVersionUID = -7445714640964238109L; + + private final int partitionId; + + public CachePartitionLostEvent(Object source, Member member, int eventType, int partitionId) { + super(source, member, eventType); + this.partitionId = partitionId; + } + + /** + * Returns the partition id that has been lost for the given cache + * + * @return the partition id that has been lost for the given cache + */ + public int getPartitionId() { + return partitionId; + } + + @Override + public String toString() { + return getClass().getSimpleName() + "{" + + super.toString() + + ", partitionId=" + partitionId + + '}'; + } +} diff --git a/hazelcast/src/main/java/com/hazelcast/cache/impl/event/CachePartitionLostEventFilter.java b/hazelcast/src/main/java/com/hazelcast/cache/impl/event/CachePartitionLostEventFilter.java new file mode 100644 index 000000000000..a27d07d6d2c2 --- /dev/null +++ b/hazelcast/src/main/java/com/hazelcast/cache/impl/event/CachePartitionLostEventFilter.java @@ -0,0 +1,55 @@ +/* + * Copyright (c) 2008-2015, Hazelcast, Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hazelcast.cache.impl.event; + +import com.hazelcast.nio.ObjectDataInput; +import com.hazelcast.nio.ObjectDataOutput; +import com.hazelcast.nio.serialization.DataSerializable; +import com.hazelcast.spi.EventFilter; + +import java.io.IOException; + +/** + * Used to filter partition lost listener events + * @since 3.6 + */ +public class CachePartitionLostEventFilter implements EventFilter, DataSerializable { + + @Override + public boolean eval(Object arg) { + return false; + } + + @Override + public void writeData(ObjectDataOutput out) throws IOException { + } + + @Override + public void readData(ObjectDataInput in) throws IOException { + } + + @Override + public boolean equals(Object obj) { + return obj instanceof CachePartitionLostEventFilter; + } + + @Override + public int hashCode() { + return 0; + } + +} diff --git a/hazelcast/src/main/java/com/hazelcast/cache/impl/event/CachePartitionLostListener.java b/hazelcast/src/main/java/com/hazelcast/cache/impl/event/CachePartitionLostListener.java new file mode 100644 index 000000000000..4bc13c5b1a0a --- /dev/null +++ b/hazelcast/src/main/java/com/hazelcast/cache/impl/event/CachePartitionLostListener.java @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2008-2015, Hazelcast, Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hazelcast.cache.impl.event; + +import java.util.EventListener; + +/** + * Invoked when owner and all backups of a partition is lost for a specific cache + * @see CachePartitionLostEvent + * @since 3.6 + */ +public interface CachePartitionLostListener extends EventListener { + + /** + * Invoked when owner and all backups of a partition is lost for a specific cache + * + * @param event the event object that contains cache name and lost partition id + */ + void partitionLost(CachePartitionLostEvent event); + +} diff --git a/hazelcast/src/main/java/com/hazelcast/cache/impl/event/ICacheEvent.java b/hazelcast/src/main/java/com/hazelcast/cache/impl/event/ICacheEvent.java new file mode 100644 index 000000000000..c879e468be80 --- /dev/null +++ b/hazelcast/src/main/java/com/hazelcast/cache/impl/event/ICacheEvent.java @@ -0,0 +1,49 @@ +/* + * Copyright (c) 2008-2015, Hazelcast, Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hazelcast.cache.impl.event; + +import com.hazelcast.cache.impl.CacheEventType; +import com.hazelcast.core.Member; + +/** + * Cache events common contract. + * @since 3.6 + */ +public interface ICacheEvent { + + /** + * Returns the member that fired this event. + * + * @return the member that fired this event. + */ + Member getMember(); + + /** + * Return the event type + * + * @return event type + */ + CacheEventType getEventType(); + + /** + * Returns the name of the cache for this event. + * + * @return name of the cache for this event. + */ + String getName(); + +} diff --git a/hazelcast/src/main/java/com/hazelcast/cache/impl/event/InternalCachePartitionLostListenerAdapter.java b/hazelcast/src/main/java/com/hazelcast/cache/impl/event/InternalCachePartitionLostListenerAdapter.java new file mode 100644 index 000000000000..e825016ba7b4 --- /dev/null +++ b/hazelcast/src/main/java/com/hazelcast/cache/impl/event/InternalCachePartitionLostListenerAdapter.java @@ -0,0 +1,41 @@ +/* + * Copyright (c) 2008-2015, Hazelcast, Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hazelcast.cache.impl.event; + +import com.hazelcast.cache.impl.CacheEventListener; +import com.hazelcast.cache.impl.CacheEventType; +import com.hazelcast.cache.impl.CachePartitionEventData; +import com.hazelcast.spi.annotation.PrivateApi; + +@PrivateApi +public class InternalCachePartitionLostListenerAdapter implements CacheEventListener { + + private final CachePartitionLostListener partitionLostListener; + + public InternalCachePartitionLostListenerAdapter(CachePartitionLostListener partitionLostListener) { + this.partitionLostListener = partitionLostListener; + } + + @Override + public void handleEvent(Object eventObject) { + final CachePartitionEventData eventData = (CachePartitionEventData) eventObject; + final CachePartitionLostEvent event = new CachePartitionLostEvent(eventData.getName(), + eventData.getMember(), CacheEventType.PARTITION_LOST.getType(), + eventData.getPartitionId()); + partitionLostListener.partitionLost(event); + } +} diff --git a/hazelcast/src/main/java/com/hazelcast/cache/impl/event/package-info.java b/hazelcast/src/main/java/com/hazelcast/cache/impl/event/package-info.java new file mode 100644 index 000000000000..0ed23009e7d6 --- /dev/null +++ b/hazelcast/src/main/java/com/hazelcast/cache/impl/event/package-info.java @@ -0,0 +1,23 @@ +/* + * Copyright (c) 2008-2015, Hazelcast, Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + *

+ * Event classes to define listenters for JCache other than EntryListener + * @since 3.6 + *

+ */ +package com.hazelcast.cache.impl.event; diff --git a/hazelcast/src/main/java/com/hazelcast/client/impl/protocol/EventMessageConst.java b/hazelcast/src/main/java/com/hazelcast/client/impl/protocol/EventMessageConst.java index c73f8e8da6b3..ec6c564e40f5 100644 --- a/hazelcast/src/main/java/com/hazelcast/client/impl/protocol/EventMessageConst.java +++ b/hazelcast/src/main/java/com/hazelcast/client/impl/protocol/EventMessageConst.java @@ -44,4 +44,6 @@ public final class EventMessageConst { //ENTERPRISE public static final int EVENT_QUERYCACHESINGLE = 212; public static final int EVENT_QUERYCACHEBATCH = 213; + + public static final int EVENT_CACHEPARTITIONLOST = 214; } diff --git a/hazelcast/src/main/java/com/hazelcast/client/impl/protocol/task/cache/CacheAddPartitionLostListenerMessageTask.java b/hazelcast/src/main/java/com/hazelcast/client/impl/protocol/task/cache/CacheAddPartitionLostListenerMessageTask.java new file mode 100644 index 000000000000..ede3c6e109c3 --- /dev/null +++ b/hazelcast/src/main/java/com/hazelcast/client/impl/protocol/task/cache/CacheAddPartitionLostListenerMessageTask.java @@ -0,0 +1,107 @@ +/* + * Copyright (c) 2008-2015, Hazelcast, Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hazelcast.client.impl.protocol.task.cache; + +import com.hazelcast.cache.impl.AbstractCacheService; +import com.hazelcast.cache.impl.CacheService; +import com.hazelcast.cache.impl.event.CachePartitionLostEvent; +import com.hazelcast.cache.impl.event.CachePartitionLostEventFilter; +import com.hazelcast.cache.impl.event.CachePartitionLostListener; +import com.hazelcast.cache.impl.event.InternalCachePartitionLostListenerAdapter; +import com.hazelcast.client.ClientEndpoint; +import com.hazelcast.client.impl.protocol.ClientMessage; +import com.hazelcast.client.impl.protocol.codec.CacheAddPartitionLostListenerCodec; +import com.hazelcast.client.impl.protocol.task.AbstractCallableMessageTask; +import com.hazelcast.instance.Node; +import com.hazelcast.nio.Connection; +import com.hazelcast.spi.EventFilter; +import com.hazelcast.spi.EventRegistration; + +import java.security.Permission; + +public class CacheAddPartitionLostListenerMessageTask + extends AbstractCallableMessageTask { + + + public CacheAddPartitionLostListenerMessageTask(ClientMessage clientMessage, Node node, Connection connection) { + super(clientMessage, node, connection); + } + + @Override + protected Object call() { + final ClientEndpoint endpoint = getEndpoint(); + + final CachePartitionLostListener listener = new CachePartitionLostListener() { + @Override + public void partitionLost(CachePartitionLostEvent event) { + if (endpoint.isAlive()) { + ClientMessage eventMessage = + CacheAddPartitionLostListenerCodec.encodeCachePartitionLostEvent(event.getPartitionId(), + event.getMember().getUuid()); + sendClientMessage(null, eventMessage); + } + } + }; + + final InternalCachePartitionLostListenerAdapter listenerAdapter = + new InternalCachePartitionLostListenerAdapter(listener); + final EventFilter filter = new CachePartitionLostEventFilter(); + final CacheService service = getService(CacheService.SERVICE_NAME); + final EventRegistration registration = service.getNodeEngine(). + getEventService().registerListener(AbstractCacheService.SERVICE_NAME, + parameters.name, filter, listenerAdapter); + final String registrationId = registration.getId(); + endpoint.setListenerRegistration(CacheService.SERVICE_NAME, parameters.name, registrationId); + return registrationId; + + } + + @Override + protected CacheAddPartitionLostListenerCodec.RequestParameters decodeClientMessage(ClientMessage clientMessage) { + return CacheAddPartitionLostListenerCodec.decodeRequest(clientMessage); + } + + @Override + protected ClientMessage encodeResponse(Object response) { + return CacheAddPartitionLostListenerCodec.encodeResponse((String) response); + } + + @Override + public String getServiceName() { + return CacheService.SERVICE_NAME; + } + + @Override + public String getMethodName() { + return "addCachePartitionLostListener"; + } + + @Override + public Object[] getParameters() { + return null; + } + + @Override + public Permission getRequiredPermission() { + return null; + } + + @Override + public String getDistributedObjectName() { + return parameters.name; + } +} diff --git a/hazelcast/src/main/java/com/hazelcast/client/impl/protocol/task/cache/CacheRemovePartitionLostListenerMessageTask.java b/hazelcast/src/main/java/com/hazelcast/client/impl/protocol/task/cache/CacheRemovePartitionLostListenerMessageTask.java new file mode 100644 index 000000000000..188bdaaecbfc --- /dev/null +++ b/hazelcast/src/main/java/com/hazelcast/client/impl/protocol/task/cache/CacheRemovePartitionLostListenerMessageTask.java @@ -0,0 +1,79 @@ +/* + * Copyright (c) 2008-2015, Hazelcast, Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hazelcast.client.impl.protocol.task.cache; + +import com.hazelcast.cache.impl.AbstractCacheService; +import com.hazelcast.cache.impl.CacheService; +import com.hazelcast.cache.impl.ICacheService; +import com.hazelcast.client.impl.protocol.ClientMessage; +import com.hazelcast.client.impl.protocol.codec.CacheRemovePartitionLostListenerCodec; +import com.hazelcast.client.impl.protocol.task.AbstractCallableMessageTask; +import com.hazelcast.instance.Node; +import com.hazelcast.nio.Connection; + +import java.security.Permission; + +public class CacheRemovePartitionLostListenerMessageTask + extends AbstractCallableMessageTask { + + + public CacheRemovePartitionLostListenerMessageTask(ClientMessage clientMessage, Node node, Connection connection) { + super(clientMessage, node, connection); + } + + @Override + protected Object call() { + ICacheService service = getService(CacheService.SERVICE_NAME); + return service.getNodeEngine().getEventService().deregisterListener(AbstractCacheService.SERVICE_NAME, + parameters.name, parameters.registrationId); + } + + @Override + protected CacheRemovePartitionLostListenerCodec.RequestParameters decodeClientMessage(ClientMessage clientMessage) { + return CacheRemovePartitionLostListenerCodec.decodeRequest(clientMessage); + } + + @Override + protected ClientMessage encodeResponse(Object response) { + return CacheRemovePartitionLostListenerCodec.encodeResponse((Boolean) response); + } + + @Override + public String getServiceName() { + return CacheService.SERVICE_NAME; + } + + @Override + public String getDistributedObjectName() { + return parameters.name; + } + + @Override + public Permission getRequiredPermission() { + return null; + } + + @Override + public String getMethodName() { + return "removeCachePartitionLostListener"; + } + + @Override + public Object[] getParameters() { + return new Object[]{parameters.registrationId}; + } +} diff --git a/hazelcast/src/main/java/com/hazelcast/client/impl/protocol/template/CacheCodecTemplate.java b/hazelcast/src/main/java/com/hazelcast/client/impl/protocol/template/CacheCodecTemplate.java index f87a920e84ae..7c93c1b58015 100644 --- a/hazelcast/src/main/java/com/hazelcast/client/impl/protocol/template/CacheCodecTemplate.java +++ b/hazelcast/src/main/java/com/hazelcast/client/impl/protocol/template/CacheCodecTemplate.java @@ -106,4 +106,11 @@ public interface CacheCodecTemplate { @Request(id = 25, retryable = true, response = ResponseMessageConst.INTEGER) void size(String name); + @Request(id = 26, retryable = true, response = ResponseMessageConst.STRING, + event = EventMessageConst.EVENT_CACHEPARTITIONLOST) + void addPartitionLostListener(String name); + + @Request(id = 27, retryable = false, response = ResponseMessageConst.BOOLEAN) + void removePartitionLostListener(String name, String registrationId); + } diff --git a/hazelcast/src/main/java/com/hazelcast/client/impl/protocol/template/EventResponseTemplate.java b/hazelcast/src/main/java/com/hazelcast/client/impl/protocol/template/EventResponseTemplate.java index 3542a9c2e5b0..69244d36594e 100644 --- a/hazelcast/src/main/java/com/hazelcast/client/impl/protocol/template/EventResponseTemplate.java +++ b/hazelcast/src/main/java/com/hazelcast/client/impl/protocol/template/EventResponseTemplate.java @@ -79,5 +79,7 @@ void Entry(@Nullable Data key, @Nullable Data value, @Nullable Data oldValue, @N @EventResponse(EventMessageConst.EVENT_QUERYCACHEBATCH) void QueryCacheBatch(List events, String source, int partitionId); + @EventResponse(EventMessageConst.EVENT_CACHEPARTITIONLOST) + void CachePartitionLost(int partitionId, String uuid); } diff --git a/hazelcast/src/main/java/com/hazelcast/config/CacheConfig.java b/hazelcast/src/main/java/com/hazelcast/config/CacheConfig.java index a9b4a6bc2053..598b7a7a9245 100644 --- a/hazelcast/src/main/java/com/hazelcast/config/CacheConfig.java +++ b/hazelcast/src/main/java/com/hazelcast/config/CacheConfig.java @@ -16,6 +16,9 @@ package com.hazelcast.config; +import com.hazelcast.config.CacheSimpleConfig.ExpiryPolicyFactoryConfig.DurationConfig; +import com.hazelcast.config.CacheSimpleConfig.ExpiryPolicyFactoryConfig.TimedExpiryPolicyFactoryConfig; +import com.hazelcast.config.CacheSimpleConfig.ExpiryPolicyFactoryConfig.TimedExpiryPolicyFactoryConfig.ExpiryPolicyType; import com.hazelcast.nio.ClassLoaderUtil; import com.hazelcast.nio.ObjectDataInput; import com.hazelcast.nio.ObjectDataOutput; @@ -33,10 +36,8 @@ import javax.cache.expiry.ModifiedExpiryPolicy; import javax.cache.expiry.TouchedExpiryPolicy; import java.io.IOException; - -import com.hazelcast.config.CacheSimpleConfig.ExpiryPolicyFactoryConfig.TimedExpiryPolicyFactoryConfig; -import com.hazelcast.config.CacheSimpleConfig.ExpiryPolicyFactoryConfig.DurationConfig; -import com.hazelcast.config.CacheSimpleConfig.ExpiryPolicyFactoryConfig.TimedExpiryPolicyFactoryConfig.ExpiryPolicyType; +import java.util.ArrayList; +import java.util.List; import static com.hazelcast.config.CacheSimpleConfig.DEFAULT_BACKUP_COUNT; import static com.hazelcast.config.CacheSimpleConfig.DEFAULT_IN_MEMORY_FORMAT; @@ -68,7 +69,7 @@ public class CacheConfig private CacheEvictionConfig evictionConfig = new CacheEvictionConfig(); private WanReplicationRef wanReplicationRef; - + private List partitionLostListenerConfigs; private String quorumName; public CacheConfig() { @@ -95,6 +96,10 @@ public CacheConfig(CompleteConfiguration configuration) { if (config.wanReplicationRef != null) { this.wanReplicationRef = new WanReplicationRef(config.wanReplicationRef); } + if (config.partitionLostListenerConfigs != null) { + this.partitionLostListenerConfigs = new ArrayList( + config.partitionLostListenerConfigs); + } this.quorumName = config.quorumName; } } @@ -144,6 +149,9 @@ public CacheConfig(CacheSimpleConfig simpleConfig) throws Exception { listenerFactory, filterFactory, isOldValueRequired, synchronous); addCacheEntryListenerConfiguration(listenerConfiguration); } + for (CachePartitionLostListenerConfig listenerConfig : simpleConfig.getPartitionLostListenerConfigs()) { + getPartitionLostListenerConfigs().add(listenerConfig); + } this.quorumName = simpleConfig.getQuorumName(); } @@ -380,6 +388,28 @@ public CacheConfig setWanReplicationRef(WanReplicationRef wanReplicationRef) { return this; } + /** + * Gets the partition lost listener references added to cache config + * + * @return List of CachePartitionLostListenerConfig. + */ + public List getPartitionLostListenerConfigs() { + if (partitionLostListenerConfigs == null) { + partitionLostListenerConfigs = new ArrayList(); + } + return partitionLostListenerConfigs; + } + + /** + * Sets the Wan target replication reference. + * + * @param partitionLostListenerConfigs CachePartitionLostListenerConfig list. + */ + public CacheConfig setPartitionLostListenerConfigs(List partitionLostListenerConfigs) { + this.partitionLostListenerConfigs = partitionLostListenerConfigs; + return this; + } + /** * Gets the data type that will be used for storing records. * diff --git a/hazelcast/src/main/java/com/hazelcast/config/CachePartitionLostListenerConfig.java b/hazelcast/src/main/java/com/hazelcast/config/CachePartitionLostListenerConfig.java new file mode 100644 index 000000000000..ad60c1d41816 --- /dev/null +++ b/hazelcast/src/main/java/com/hazelcast/config/CachePartitionLostListenerConfig.java @@ -0,0 +1,94 @@ +/* + * Copyright (c) 2008-2015, Hazelcast, Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hazelcast.config; + +import com.hazelcast.cache.impl.event.CachePartitionLostListener; + +import java.io.Serializable; + +; + +/** + * Configuration for CachePartitionLostListener + * @see CachePartitionLostListener + */ +public class CachePartitionLostListenerConfig + extends ListenerConfig implements Serializable { + + private CachePartitionLostListenerConfigReadOnly readOnly; + + public CachePartitionLostListenerConfig() { + } + + public CachePartitionLostListenerConfig(String className) { + super(className); + } + + public CachePartitionLostListenerConfig(CachePartitionLostListener implementation) { + super(implementation); + } + + public CachePartitionLostListenerConfig(CachePartitionLostListenerConfig config) { + implementation = config.getImplementation(); + className = config.getClassName(); + } + + public CachePartitionLostListenerConfigReadOnly getAsReadOnly() { + if (readOnly == null) { + readOnly = new CachePartitionLostListenerConfigReadOnly(this); + } + return readOnly; + } + + public CachePartitionLostListener getImplementation() { + return (CachePartitionLostListener) implementation; + } + + public CachePartitionLostListenerConfig setImplementation(final CachePartitionLostListener implementation) { + super.setImplementation(implementation); + return this; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + + CachePartitionLostListenerConfig that = (CachePartitionLostListenerConfig) o; + + if (className != null ? !className.equals(that.className) : that.className != null) { + return false; + } + return !(implementation != null ? !implementation.equals(that.implementation) : that.implementation != null); + + } + + @Override + public int hashCode() { + int result = super.hashCode(); + result = 31 * result + (className != null ? className.hashCode() : 0); + result = 31 * result + (implementation != null ? implementation.hashCode() : 0); + return result; + } +} diff --git a/hazelcast/src/main/java/com/hazelcast/config/CachePartitionLostListenerConfigReadOnly.java b/hazelcast/src/main/java/com/hazelcast/config/CachePartitionLostListenerConfigReadOnly.java new file mode 100644 index 000000000000..4eb7c6a5de49 --- /dev/null +++ b/hazelcast/src/main/java/com/hazelcast/config/CachePartitionLostListenerConfigReadOnly.java @@ -0,0 +1,49 @@ +/* + * Copyright (c) 2008-2015, Hazelcast, Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hazelcast.config; + +import com.hazelcast.cache.impl.event.CachePartitionLostListener; + +import java.util.EventListener; + +/** + * Read-Only Configuration for CachePartitionLostListener + * @see CachePartitionLostListener + */ +public class CachePartitionLostListenerConfigReadOnly + extends CachePartitionLostListenerConfig { + + public CachePartitionLostListenerConfigReadOnly(CachePartitionLostListenerConfig config) { + super(config); + } + + public CachePartitionLostListener getImplementation() { + return (CachePartitionLostListener) implementation; + } + + public ListenerConfig setClassName(String className) { + throw new UnsupportedOperationException("this config is read-only"); + } + + public ListenerConfig setImplementation(EventListener implementation) { + throw new UnsupportedOperationException("this config is read-only"); + } + + public CachePartitionLostListenerConfig setImplementation(CachePartitionLostListener implementation) { + throw new UnsupportedOperationException("this config is read-only"); + } +} diff --git a/hazelcast/src/main/java/com/hazelcast/config/CacheSimpleConfig.java b/hazelcast/src/main/java/com/hazelcast/config/CacheSimpleConfig.java index 5857a3c0590d..cd301246349c 100644 --- a/hazelcast/src/main/java/com/hazelcast/config/CacheSimpleConfig.java +++ b/hazelcast/src/main/java/com/hazelcast/config/CacheSimpleConfig.java @@ -88,6 +88,8 @@ public class CacheSimpleConfig { private String quorumName; + private List partitionLostListenerConfigs; + public CacheSimpleConfig(CacheSimpleConfig cacheSimpleConfig) { this.name = cacheSimpleConfig.name; this.keyType = cacheSimpleConfig.keyType; @@ -108,6 +110,8 @@ public CacheSimpleConfig(CacheSimpleConfig cacheSimpleConfig) { this.evictionConfig = cacheSimpleConfig.evictionConfig; } this.wanReplicationRef = cacheSimpleConfig.wanReplicationRef; + this.partitionLostListenerConfigs = + new ArrayList(cacheSimpleConfig.getPartitionLostListenerConfigs()); this.quorumName = cacheSimpleConfig.quorumName; } @@ -503,6 +507,39 @@ public CacheSimpleConfig setQuorumName(String quorumName) { return this; } + /** + * Gets the partition lost listener references added to cache config + * + * @return List of CachePartitionLostListenerConfig. + */ + public List getPartitionLostListenerConfigs() { + if (partitionLostListenerConfigs == null) { + partitionLostListenerConfigs = new ArrayList(); + } + return partitionLostListenerConfigs; + } + + /** + * Sets the PartitionLostListenerConfigs + * + * @param partitionLostListenerConfigs CachePartitionLostListenerConfig list. + */ + public CacheSimpleConfig setPartitionLostListenerConfigs( + List partitionLostListenerConfigs) { + this.partitionLostListenerConfigs = partitionLostListenerConfigs; + return this; + } + + /** + * Adds the CachePartitionLostListenerConfig to partitionLostListenerConfigs + * + * @param listenerConfig CachePartitionLostListenerConfig to be added. + */ + public CacheSimpleConfig addCachePartitionLostListenerConfig(CachePartitionLostListenerConfig listenerConfig) { + getPartitionLostListenerConfigs().add(listenerConfig); + return this; + } + /** * Represents configuration for "ExpiryPolicyFactory". */ diff --git a/hazelcast/src/main/java/com/hazelcast/config/ConfigXmlGenerator.java b/hazelcast/src/main/java/com/hazelcast/config/ConfigXmlGenerator.java index 62f5169b8aff..8d21479765bf 100644 --- a/hazelcast/src/main/java/com/hazelcast/config/ConfigXmlGenerator.java +++ b/hazelcast/src/main/java/com/hazelcast/config/ConfigXmlGenerator.java @@ -454,6 +454,9 @@ private void cacheConfigXmlGenerator(StringBuilder xml, Config config) { wanReplicationConfigXmlGenerator(xml, c.getWanReplicationRef()); + cachePartitionLostListenerConfigXmlGenerator(xml, + c.getPartitionLostListenerConfigs()); + evictionConfigXmlGenerator(xml, c.getEvictionConfig()); if (c.getQuorumName() != null) { @@ -464,6 +467,21 @@ private void cacheConfigXmlGenerator(StringBuilder xml, Config config) { } } + private void cachePartitionLostListenerConfigXmlGenerator(StringBuilder xml, List configs) { + if (!configs.isEmpty()) { + xml.append(""); + for (CachePartitionLostListenerConfig c : configs) { + xml.append(""); + final String clazz = c.getImplementation() + != null ? c.getImplementation().getClass().getName() : c.getClassName(); + xml.append(clazz); + xml.append(""); + } + xml.append(""); + } + } + + private void mapPartitionStrategyConfigXmlGenerator(StringBuilder xml, MapConfig m) { if (m.getPartitioningStrategyConfig() != null) { xml.append(""); diff --git a/hazelcast/src/main/java/com/hazelcast/config/XmlConfigBuilder.java b/hazelcast/src/main/java/com/hazelcast/config/XmlConfigBuilder.java index 98187b5cc087..0c5e29fab36c 100644 --- a/hazelcast/src/main/java/com/hazelcast/config/XmlConfigBuilder.java +++ b/hazelcast/src/main/java/com/hazelcast/config/XmlConfigBuilder.java @@ -1033,6 +1033,8 @@ private void handleCache(final org.w3c.dom.Node node) cacheConfig.setEvictionConfig(getEvictionConfig(n)); } else if ("quorum-ref".equals(nodeName)) { cacheConfig.setQuorumName(value); + } else if ("partition-lost-listeners".equals(nodeName)) { + cachePartitionLostListenerHandle(n, cacheConfig); } } this.config.addCacheConfig(cacheConfig); @@ -1170,6 +1172,16 @@ private void cacheWanReplicationRefHandle(Node n, CacheSimpleConfig cacheConfig) cacheConfig.setWanReplicationRef(wanReplicationRef); } + private void cachePartitionLostListenerHandle(Node n, CacheSimpleConfig cacheConfig) { + for (org.w3c.dom.Node listenerNode : new IterableNodeList(n.getChildNodes())) { + if ("partition-lost-listener".equals(cleanNodeName(listenerNode))) { + String listenerClass = getTextContent(listenerNode); + cacheConfig.addCachePartitionLostListenerConfig( + new CachePartitionLostListenerConfig(listenerClass)); + } + } + } + private void cacheListenerHandle(Node n, CacheSimpleConfig cacheSimpleConfig) { for (org.w3c.dom.Node listenerNode : new IterableNodeList(n.getChildNodes())) { if ("cache-entry-listener".equals(cleanNodeName(listenerNode))) { diff --git a/hazelcast/src/main/java/com/hazelcast/spi/impl/PortableCachePartitionLostEvent.java b/hazelcast/src/main/java/com/hazelcast/spi/impl/PortableCachePartitionLostEvent.java new file mode 100644 index 000000000000..44a9d9563a61 --- /dev/null +++ b/hazelcast/src/main/java/com/hazelcast/spi/impl/PortableCachePartitionLostEvent.java @@ -0,0 +1,71 @@ +/* + * Copyright (c) 2008-2015, Hazelcast, Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hazelcast.spi.impl; + +import com.hazelcast.nio.serialization.Portable; +import com.hazelcast.nio.serialization.PortableReader; +import com.hazelcast.nio.serialization.PortableWriter; + +import java.io.IOException; + +public class PortableCachePartitionLostEvent implements Portable { + + private int partitionId; + + private String uuid; + + public PortableCachePartitionLostEvent() { + } + + public PortableCachePartitionLostEvent(int partitionId, String uuid) { + this.partitionId = partitionId; + this.uuid = uuid; + } + + public int getPartitionId() { + return partitionId; + } + + public String getUuid() { + return uuid; + } + + @Override + public int getFactoryId() { + return SpiPortableHook.ID; + } + + @Override + public int getClassId() { + return SpiPortableHook.CACHE_PARTITION_LOST_EVENT; + } + + @Override + public void writePortable(PortableWriter writer) + throws IOException { + writer.writeInt("p", partitionId); + writer.writeUTF("u", uuid); + + } + + @Override + public void readPortable(PortableReader reader) + throws IOException { + partitionId = reader.readInt("p"); + uuid = reader.readUTF("u"); + } +} diff --git a/hazelcast/src/main/java/com/hazelcast/spi/impl/SpiPortableHook.java b/hazelcast/src/main/java/com/hazelcast/spi/impl/SpiPortableHook.java index b105af866446..4de7218ad86f 100644 --- a/hazelcast/src/main/java/com/hazelcast/spi/impl/SpiPortableHook.java +++ b/hazelcast/src/main/java/com/hazelcast/spi/impl/SpiPortableHook.java @@ -39,6 +39,7 @@ public final class SpiPortableHook implements PortableHook { public static final int DISTRIBUTED_OBJECT_EVENT = 5; public static final int MAP_PARTITION_LOST_EVENT = 6; public static final int PARTITION_LOST_EVENT = 7; + public static final int CACHE_PARTITION_LOST_EVENT = 8; @Override public int getFactoryId() { @@ -64,6 +65,8 @@ public Portable create(int classId) { return new PortableMapPartitionLostEvent(); case PARTITION_LOST_EVENT: return new PortablePartitionLostEvent(); + case CACHE_PARTITION_LOST_EVENT: + return new PortableCachePartitionLostEvent(); default: return null; } diff --git a/hazelcast/src/main/resources/hazelcast-config-3.6.xsd b/hazelcast/src/main/resources/hazelcast-config-3.6.xsd index b91a8b353cb4..9a0f98f7c129 100644 --- a/hazelcast/src/main/resources/hazelcast-config-3.6.xsd +++ b/hazelcast/src/main/resources/hazelcast-config-3.6.xsd @@ -468,6 +468,7 @@
+ diff --git a/hazelcast/src/test/java/com/hazelcast/cache/CachePartitionLostListenerConfigTest.java b/hazelcast/src/test/java/com/hazelcast/cache/CachePartitionLostListenerConfigTest.java new file mode 100644 index 000000000000..52deb76b5fc1 --- /dev/null +++ b/hazelcast/src/test/java/com/hazelcast/cache/CachePartitionLostListenerConfigTest.java @@ -0,0 +1,78 @@ +package com.hazelcast.cache; + +import com.hazelcast.cache.impl.CacheService; +import com.hazelcast.cache.impl.HazelcastServerCachingProvider; +import com.hazelcast.cache.impl.event.CachePartitionLostListener; +import com.hazelcast.config.CachePartitionLostListenerConfig; +import com.hazelcast.config.CacheSimpleConfig; +import com.hazelcast.config.Config; +import com.hazelcast.config.XmlConfigBuilder; +import com.hazelcast.core.HazelcastInstance; +import com.hazelcast.spi.EventRegistration; +import com.hazelcast.spi.EventService; +import com.hazelcast.test.AssertTask; +import com.hazelcast.test.HazelcastSerialClassRunner; +import com.hazelcast.test.HazelcastTestSupport; +import com.hazelcast.test.TestHazelcastInstanceFactory; +import com.hazelcast.test.annotation.ParallelTest; +import com.hazelcast.test.annotation.QuickTest; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import javax.cache.CacheManager; +import java.io.IOException; +import java.net.URL; +import java.util.Collection; +import java.util.List; + +import static com.hazelcast.cache.impl.HazelcastServerCachingProvider.createCachingProvider; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.mockito.Mockito.mock; + +@RunWith(HazelcastSerialClassRunner.class) +@Category({QuickTest.class, ParallelTest.class}) +public class CachePartitionLostListenerConfigTest extends HazelcastTestSupport { + + private final URL configUrl = getClass().getClassLoader().getResource("test-hazelcast-jcache-partition-lost-listener.xml"); + + @Test + public void testCachePartitionLostListener_registeredViaImplementationInConfigObject() { + + final TestHazelcastInstanceFactory factory = createHazelcastInstanceFactory(); + final Config config = new Config(); + final String cacheName = "myCache"; + final CacheSimpleConfig cacheConfig = config.getCacheConfig(cacheName); + final CachePartitionLostListener listener = mock(CachePartitionLostListener.class); + cacheConfig.addCachePartitionLostListenerConfig(new CachePartitionLostListenerConfig(listener)); + final int backupCount = 0; + cacheConfig.setBackupCount(backupCount); + + final HazelcastInstance instance = factory.newHazelcastInstance(config); + final HazelcastServerCachingProvider cachingProvider = createCachingProvider(instance); + final CacheManager cacheManager = cachingProvider.getCacheManager(); + cacheManager.getCache(cacheName); + + final EventService eventService = getNode(instance).getNodeEngine().getEventService(); + + assertTrueEventually(new AssertTask() { + @Override + public void run() + throws Exception { + final Collection registrations = eventService.getRegistrations(CacheService.SERVICE_NAME, cacheName); + assertFalse(registrations.isEmpty()); + } + }); + } + + @Test + public void cacheConfigXmlTest() throws IOException { + final String cacheName = "cacheWithPartitionLostListener"; + Config config = new XmlConfigBuilder(configUrl).build(); + CacheSimpleConfig cacheConfig = config.getCacheConfig(cacheName); + List configs = cacheConfig.getPartitionLostListenerConfigs(); + assertEquals(1, configs.size()); + assertEquals("DummyCachePartitionLostListenerImpl", configs.get(0).getClassName()); + } +} diff --git a/hazelcast/src/test/java/com/hazelcast/cache/CachePartitionLostListenerStressTest.java b/hazelcast/src/test/java/com/hazelcast/cache/CachePartitionLostListenerStressTest.java new file mode 100644 index 000000000000..42621a7e04ee --- /dev/null +++ b/hazelcast/src/test/java/com/hazelcast/cache/CachePartitionLostListenerStressTest.java @@ -0,0 +1,175 @@ +package com.hazelcast.cache; + +import com.hazelcast.cache.impl.HazelcastServerCachingProvider; +import com.hazelcast.cache.impl.event.CachePartitionLostEvent; +import com.hazelcast.config.CacheConfig; +import com.hazelcast.core.HazelcastInstance; +import com.hazelcast.cache.CachePartitionLostListenerTest.EventCollectingCachePartitionLostListener; +import com.hazelcast.partition.AbstractPartitionLostListenerTest; +import com.hazelcast.test.AssertTask; +import com.hazelcast.test.HazelcastSerialClassRunner; +import com.hazelcast.test.annotation.SlowTest; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import javax.cache.Cache; +import javax.cache.CacheManager; +import java.util.ArrayList; + +import java.util.List; +import java.util.Map; + +import static com.hazelcast.cache.impl.HazelcastServerCachingProvider.createCachingProvider; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +@RunWith(HazelcastSerialClassRunner.class) +@Category(SlowTest.class) + public class CachePartitionLostListenerStressTest + extends AbstractPartitionLostListenerTest { + + protected int getNodeCount() { + return 5; + } + + protected int getCacheEntryCount() { + return 10000; + } + + @Test + public void test_cachePartitionLostListenerInvoked_when1NodeCrashed_withoutData() + throws InterruptedException { + testCachePartitionLostListener(1, false); + } + + @Test + public void test_cachePartitionLostListenerInvoked_when1NodeCrashed_withData() + throws InterruptedException { + testCachePartitionLostListener(1, true); + } + + @Test + public void test_cachePartitionLostListenerInvoked_when2NodesCrashed_withoutData() + throws InterruptedException { + testCachePartitionLostListener(2, false); + } + + @Test + public void test_cachePartitionLostListenerInvoked_when2NodesCrashed_withData() + throws InterruptedException { + testCachePartitionLostListener(2, true); + } + + @Test + public void test_cachePartitionLostListenerInvoked_when3NodesCrashed_withoutData() + throws InterruptedException { + testCachePartitionLostListener(3, false); + } + + @Test + public void test_cachePartitionLostListenerInvoked_when3NodesCrashed_withData() + throws InterruptedException { + testCachePartitionLostListener(3, true); + } + + @Test + public void test_cachePartitionLostListenerInvoked_when4NodesCrashed_withoutData() + throws InterruptedException { + testCachePartitionLostListener(4, false); + } + + @Test + public void test_cachePartitionLostListenerInvoked_when4NodesCrashed_withData() + throws InterruptedException { + testCachePartitionLostListener(4, true); + } + + private void testCachePartitionLostListener(final int numberOfNodesToCrash, final boolean withData) { + final List instances = getCreatedInstancesShuffledAfterWarmedUp(); + + List survivingInstances = new ArrayList(instances); + final List terminatingInstances = survivingInstances.subList(0, numberOfNodesToCrash); + survivingInstances = survivingInstances.subList(numberOfNodesToCrash, instances.size()); + + final HazelcastInstance instance = survivingInstances.get(0); + final HazelcastServerCachingProvider cachingProvider = createCachingProvider(instance); + final CacheManager cacheManager = cachingProvider.getCacheManager(); + final List listeners = registerListeners(cacheManager); + + if (withData) { + for (int i = 0; i < getNodeCount(); i++) { + final Cache cache = cacheManager.getCache(getIthCacheName(i)); + for (int j = 0; j < getCacheEntryCount(); j++) { + cache.put(j, j); + } + } + } + + final String log = "Surviving: " + survivingInstances + " Terminating: " + terminatingInstances; + final Map survivingPartitions = getMinReplicaIndicesByPartitionId(survivingInstances); + + terminateInstances(terminatingInstances); + waitAllForSafeState(survivingInstances); + + for (int i = 0; i < getNodeCount(); i++) { + assertListenerInvocationsEventually(numberOfNodesToCrash, log, survivingPartitions, listeners.get(i), i); + } + + for (int i = 0; i < getNodeCount(); i++) { + cacheManager.destroyCache(getIthCacheName(i)); + } + cacheManager.close(); + cachingProvider.close(); + } + + private void assertListenerInvocationsEventually(final int numberOfNodesToCrash, final String log, + final Map survivingPartitions, + final EventCollectingCachePartitionLostListener listener, final int index) { + assertTrueEventually(new AssertTask() { + @Override + public void run() + throws Exception { + if (index < numberOfNodesToCrash) { + assertLostPartitions(log, listener, survivingPartitions); + } else { + final String message = log + " listener-" + index + " should not be invoked!"; + assertTrue(message, listener.getEvents().isEmpty()); + } + } + }); + } + + private void assertLostPartitions(final String log, final CachePartitionLostListenerTest.EventCollectingCachePartitionLostListener listener, + final Map survivingPartitions) { + final List events = listener.getEvents(); + + assertFalse(survivingPartitions.isEmpty()); + + for (CachePartitionLostEvent event : events) { + final int failedPartitionId = event.getPartitionId(); + final Integer survivingReplicaIndex = survivingPartitions.get(failedPartitionId); + if (survivingReplicaIndex != null) { + final String message = + log + ", PartitionId: " + failedPartitionId + " SurvivingReplicaIndex: " + survivingReplicaIndex + + " Cache Name: " + event.getName(); + assertTrue(message, survivingReplicaIndex > listener.getBackupCount()); + } + } + } + + private List registerListeners(final CacheManager cacheManager) { + final CacheConfig config = new CacheConfig(); + final List listeners = new ArrayList(); + for (int i = 0; i < getNodeCount(); i++) { + final EventCollectingCachePartitionLostListener listener = new EventCollectingCachePartitionLostListener(i); + listeners.add(listener); + config.setBackupCount(i); + final Cache cache = cacheManager.createCache(getIthCacheName(i), config); + final ICache iCache = cache.unwrap(ICache.class); + iCache.addPartitionLostListener(listener); + } + return listeners; + } + +} \ No newline at end of file diff --git a/hazelcast/src/test/java/com/hazelcast/cache/CachePartitionLostListenerTest.java b/hazelcast/src/test/java/com/hazelcast/cache/CachePartitionLostListenerTest.java new file mode 100644 index 000000000000..a04aabf9acf1 --- /dev/null +++ b/hazelcast/src/test/java/com/hazelcast/cache/CachePartitionLostListenerTest.java @@ -0,0 +1,152 @@ +package com.hazelcast.cache; + +import com.hazelcast.cache.impl.CacheService; +import com.hazelcast.cache.impl.HazelcastServerCachingProvider; +import com.hazelcast.cache.impl.event.CachePartitionLostEvent; +import com.hazelcast.cache.impl.event.CachePartitionLostListener; +import com.hazelcast.config.CacheConfig; +import com.hazelcast.core.HazelcastInstance; +import com.hazelcast.instance.Node; +import com.hazelcast.nio.Address; +import com.hazelcast.partition.AbstractPartitionLostListenerTest; +import com.hazelcast.partition.InternalPartition; +import com.hazelcast.partition.InternalPartitionLostEvent; +import com.hazelcast.test.AssertTask; +import com.hazelcast.test.HazelcastSerialClassRunner; +import com.hazelcast.test.annotation.ParallelTest; +import com.hazelcast.test.annotation.QuickTest; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import javax.cache.Cache; +import javax.cache.CacheManager; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; + +import static com.hazelcast.cache.impl.HazelcastServerCachingProvider.createCachingProvider; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +@RunWith(HazelcastSerialClassRunner.class) +@Category({QuickTest.class, ParallelTest.class}) +public class CachePartitionLostListenerTest + extends AbstractPartitionLostListenerTest { + + @Override + public int getNodeCount() { + return 2; + } + + public static class EventCollectingCachePartitionLostListener + implements CachePartitionLostListener { + + private final List events = Collections.synchronizedList(new LinkedList()); + + private final int backupCount; + + public EventCollectingCachePartitionLostListener(int backupCount) { + this.backupCount = backupCount; + } + + @Override + public void partitionLost(CachePartitionLostEvent event) { + this.events.add(event); + } + + public List getEvents() { + synchronized (events) { + return new ArrayList(events); + } + } + + public int getBackupCount() { + return backupCount; + } + } + + @Test + public void test_partitionLostListenerInvoked(){ + final List instances = getCreatedInstancesShuffledAfterWarmedUp(1); + final HazelcastInstance instance = instances.get(0); + final EventCollectingCachePartitionLostListener listener = new EventCollectingCachePartitionLostListener(0); + final HazelcastServerCachingProvider cachingProvider = createCachingProvider(instance); + final CacheManager cacheManager = cachingProvider.getCacheManager(); + final CacheConfig config = new CacheConfig(); + final Cache cache = cacheManager.createCache(getIthCacheName(0), config); + final ICache iCache = cache.unwrap(ICache.class); + + iCache.addPartitionLostListener(listener); + + final InternalPartitionLostEvent internalEvent = new InternalPartitionLostEvent(1, 1, null); + final CacheService cacheService = getNode(instance).getNodeEngine().getService(CacheService.SERVICE_NAME); + cacheService.onPartitionLost(internalEvent); + + assertTrueEventually(new AssertTask() { + @Override + public void run() + throws Exception { + final List events = listener.getEvents(); + assertEquals(1, events.size()); + final CachePartitionLostEvent event = events.get(0); + assertEquals(internalEvent.getPartitionId(), event.getPartitionId()); + } + }); + + cacheManager.destroyCache(getIthCacheName(0)); + cacheManager.close(); + cachingProvider.close(); + } + + @Test + public void test_partitionLostListenerInvoked_whenNodeCrashed() { + + final List instances = getCreatedInstancesShuffledAfterWarmedUp(2); + final HazelcastInstance survivingInstance = instances.get(0); + final HazelcastInstance terminatingInstance = instances.get(1); + final EventCollectingCachePartitionLostListener listener = new EventCollectingCachePartitionLostListener(0); + final HazelcastServerCachingProvider cachingProvider = createCachingProvider(survivingInstance); + final CacheManager cacheManager = cachingProvider.getCacheManager(); + final CacheConfig config = new CacheConfig(); + config.setBackupCount(0); + final Cache cache = cacheManager.createCache(getIthCacheName(0), config); + final ICache iCache = cache.unwrap(ICache.class); + + iCache.addPartitionLostListener(listener); + + final Set survivingPartitionIds = new HashSet(); + final Node survivingNode = getNode(survivingInstance); + final Address survivingAddress = survivingNode.getThisAddress(); + + for (InternalPartition partition : survivingNode.getPartitionService().getPartitions()) { + if (survivingAddress.equals(partition.getReplicaAddress(0))) { + survivingPartitionIds.add(partition.getPartitionId()); + } + } + + terminatingInstance.getLifecycleService().terminate(); + waitAllForSafeState(survivingInstance); + + assertTrueEventually(new AssertTask() { + @Override + public void run() + throws Exception { + final List events = listener.getEvents(); + assertFalse(events.isEmpty()); + for (CachePartitionLostEvent event : events) { + assertFalse(survivingPartitionIds.contains(event.getPartitionId())); + } + } + }); + + cacheManager.destroyCache(getIthCacheName(0)); + cacheManager.close(); + cachingProvider.close(); + + } + +} diff --git a/hazelcast/src/test/java/com/hazelcast/config/XMLConfigBuilderTest.java b/hazelcast/src/test/java/com/hazelcast/config/XMLConfigBuilderTest.java index 0288c91e9b33..53df3aa06314 100644 --- a/hazelcast/src/test/java/com/hazelcast/config/XMLConfigBuilderTest.java +++ b/hazelcast/src/test/java/com/hazelcast/config/XMLConfigBuilderTest.java @@ -691,6 +691,43 @@ private String createMapPartitionLostListenerConfiguredXml(String mapName, Strin "\n"; } + @Test + public void testCachePartitionLostListenerConfig() { + String cacheName = "cache1"; + String listenerName = "DummyCachePartitionLostListenerImpl"; + String xml = createCachePartitionLostListenerConfiguredXml(cacheName, listenerName); + + Config config = buildConfig(xml); + CacheSimpleConfig cacheConfig = config.getCacheConfig("cache1"); + assertCachePartitionLostListener(listenerName, cacheConfig); + } + + @Test + public void testCachePartitionLostListenerConfigReadOnly() { + String cacheName = "cache1"; + String listenerName = "DummyCachePartitionLostListenerImpl"; + String xml = createCachePartitionLostListenerConfiguredXml(cacheName, listenerName); + + Config config = buildConfig(xml); + CacheSimpleConfig cacheConfig = config.findCacheConfig("cache1"); + assertCachePartitionLostListener(listenerName, cacheConfig); + } + + private void assertCachePartitionLostListener(String listenerName, CacheSimpleConfig cacheConfig) { + assertFalse(cacheConfig.getPartitionLostListenerConfigs().isEmpty()); + assertEquals(listenerName, cacheConfig.getPartitionLostListenerConfigs().get(0).getClassName()); + } + + private String createCachePartitionLostListenerConfiguredXml(String cacheName, String listenerName) { + return "\n" + + "\n" + + "\n" + + "" + listenerName + "\n" + + "\n" + + "\n" + + "\n"; + } + private void testXSDConfigXML(String xmlFileName) throws SAXException, IOException { SchemaFactory factory = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI); URL schemaResource = XMLConfigBuilderTest.class.getClassLoader().getResource("hazelcast-config-3.5.xsd"); diff --git a/hazelcast/src/test/java/com/hazelcast/partition/AbstractPartitionLostListenerTest.java b/hazelcast/src/test/java/com/hazelcast/partition/AbstractPartitionLostListenerTest.java index e9f94296f2bd..4785741a3d5b 100644 --- a/hazelcast/src/test/java/com/hazelcast/partition/AbstractPartitionLostListenerTest.java +++ b/hazelcast/src/test/java/com/hazelcast/partition/AbstractPartitionLostListenerTest.java @@ -101,6 +101,10 @@ final protected String getIthMapName(final int i) { return "map-" + i; } + final protected String getIthCacheName(final int i) { + return "cache-" + i; + } + final protected Map getMinReplicaIndicesByPartitionId(final List instances) { final Map survivingPartitions = new HashMap(); for (HazelcastInstance instance : instances) { diff --git a/hazelcast/src/test/resources/test-hazelcast-jcache-partition-lost-listener.xml b/hazelcast/src/test/resources/test-hazelcast-jcache-partition-lost-listener.xml new file mode 100644 index 000000000000..a5abdce92669 --- /dev/null +++ b/hazelcast/src/test/resources/test-hazelcast-jcache-partition-lost-listener.xml @@ -0,0 +1,33 @@ + + + + + + + test-group1 + test-pass1 + + + + + DummyCachePartitionLostListenerImpl + + + +