diff --git a/hazelcast/src/main/java/com/hazelcast/map/impl/InterceptorRegistry.java b/hazelcast/src/main/java/com/hazelcast/map/impl/InterceptorRegistry.java new file mode 100644 index 000000000000..33c9e0aba6bb --- /dev/null +++ b/hazelcast/src/main/java/com/hazelcast/map/impl/InterceptorRegistry.java @@ -0,0 +1,124 @@ +/* + * Copyright (c) 2008-2016, Hazelcast, Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hazelcast.map.impl; + +import com.hazelcast.map.MapInterceptor; +import com.hazelcast.spi.impl.operationexecutor.classic.PartitionOperationThread; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static java.util.Collections.emptyList; +import static java.util.Collections.emptyMap; +import static java.util.Collections.unmodifiableList; +import static java.util.Collections.unmodifiableMap; + +/** + * Registry for all {@link MapInterceptor} of an {@code IMap} + * + * Interceptors are read mostly and this registry keeps all registered interceptor in an array to easily iterate on them. + * Other than that, synchronized blocks are used to prevent leaks when concurrently registering/de-registering interceptors. + * Keep in mind that all registration/de-registration operations are done in generic-operation-threads, in other words, + * synchronized methods are not used in partition-threads. + * + * This registry is created per map. + */ +public class InterceptorRegistry { + + private volatile List interceptors = emptyList(); + private volatile Map id2InterceptorMap = emptyMap(); + + /** + * Gives an instance of this registry according to generic-operation-thread count. + */ + public InterceptorRegistry() { + } + + /** + * Returns all registered interceptors. + * + * This method is called by {@link PartitionOperationThread} + * + * @return all registered interceptors + * @see com.hazelcast.map.impl.recordstore.DefaultRecordStore#put + */ + public List getInterceptors() { + return interceptors; + } + + public Map getId2InterceptorMap() { + return id2InterceptorMap; + } + + /** + * Registers a {@link MapInterceptor} for the supplied `id`. + * If there is no registration associated with the `id`, registers interceptor, + * otherwise silently ignores registration. + * + * This method is called by {@link com.hazelcast.spi.impl.operationexecutor.classic.GenericOperationThread} + * when registering via {@link com.hazelcast.map.impl.operation.AddInterceptorOperation} + * + * @param id id of the interceptor + * @param interceptor supplied {@link MapInterceptor} + */ + public synchronized void register(String id, MapInterceptor interceptor) { + assert !(Thread.currentThread() instanceof PartitionOperationThread); + + if (id2InterceptorMap.containsKey(id)) { + return; + } + + Map tmpMap = new HashMap(id2InterceptorMap); + tmpMap.put(id, interceptor); + + id2InterceptorMap = unmodifiableMap(tmpMap); + + List tmpInterceptors = new ArrayList(interceptors); + tmpInterceptors.add(interceptor); + + interceptors = unmodifiableList(tmpInterceptors); + } + + /** + * De-registers {@link MapInterceptor} for the supplied `id`, if there is any. + * + * This method is called by {@link com.hazelcast.spi.impl.operationexecutor.classic.GenericOperationThread} + * when de-registering via {@link com.hazelcast.map.impl.operation.RemoveInterceptorOperation} + * + * @param id id of the interceptor + */ + public synchronized void deregister(String id) { + assert !(Thread.currentThread() instanceof PartitionOperationThread); + + if (!id2InterceptorMap.containsKey(id)) { + return; + } + + Map tmpMap = new HashMap(id2InterceptorMap); + MapInterceptor removedInterceptor = tmpMap.remove(id); + + id2InterceptorMap = unmodifiableMap(tmpMap); + + List tmpInterceptors = new ArrayList(interceptors); + tmpInterceptors.remove(removedInterceptor); + + interceptors = unmodifiableList(tmpInterceptors); + } + +} diff --git a/hazelcast/src/main/java/com/hazelcast/map/impl/MapContainer.java b/hazelcast/src/main/java/com/hazelcast/map/impl/MapContainer.java index a0fb907dfece..4e6adf44511d 100644 --- a/hazelcast/src/main/java/com/hazelcast/map/impl/MapContainer.java +++ b/hazelcast/src/main/java/com/hazelcast/map/impl/MapContainer.java @@ -22,7 +22,6 @@ import com.hazelcast.core.IFunction; import com.hazelcast.core.PartitioningStrategy; import com.hazelcast.internal.serialization.SerializationService; -import com.hazelcast.map.MapInterceptor; import com.hazelcast.map.impl.eviction.EvictionChecker; import com.hazelcast.map.impl.eviction.EvictionCheckerImpl; import com.hazelcast.map.impl.eviction.Evictor; @@ -44,10 +43,6 @@ import com.hazelcast.wan.WanReplicationPublisher; import com.hazelcast.wan.WanReplicationService; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicInteger; import static com.hazelcast.map.impl.SizeEstimators.createNearCacheSizeEstimator; @@ -61,7 +56,6 @@ public class MapContainer { protected final String name; protected final String quorumName; protected final MapServiceContext mapServiceContext; - protected final Map interceptorMap; protected final Indexes indexes; protected final Extractors extractors; protected final SizeEstimator nearCacheSizeEstimator; @@ -69,7 +63,7 @@ public class MapContainer { protected final MapStoreContext mapStoreContext; protected final SerializationService serializationService; protected final QueryEntryFactory queryEntryFactory; - protected final List interceptors; + protected final InterceptorRegistry interceptorRegistry; protected final IFunction toDataFunction = new IFunction() { @Override public Data apply(Object input) { @@ -107,8 +101,6 @@ public MapContainer(final String name, final MapConfig mapConfig, final MapServi this.recordFactoryConstructor = createRecordFactoryConstructor(serializationService); this.queryEntryFactory = new QueryEntryFactory(mapConfig.getCacheDeserializedValues()); initWanReplication(nodeEngine); - this.interceptors = new CopyOnWriteArrayList(); - this.interceptorMap = new ConcurrentHashMap(); this.nearCacheSizeEstimator = createNearCacheSizeEstimator(mapConfig.getNearCacheConfig()); this.mapStoreContext = createMapStoreContext(this); this.mapStoreContext.start(); @@ -116,6 +108,7 @@ public MapContainer(final String name, final MapConfig mapConfig, final MapServi this.indexes = new Indexes(serializationService, extractors); this.evictor = createEvictor(mapServiceContext); this.memberNearCacheInvalidationEnabled = isNearCacheEnabled() && mapConfig.getNearCacheConfig().isInvalidateOnChange(); + this.interceptorRegistry = new InterceptorRegistry(); } // this method is overridden. @@ -183,27 +176,6 @@ public MapMergePolicy getWanMergePolicy() { return wanMergePolicy; } - public void addInterceptor(String id, MapInterceptor interceptor) { - - removeInterceptor(id); - - interceptorMap.put(id, interceptor); - interceptors.add(interceptor); - } - - public List getInterceptors() { - return interceptors; - } - - public Map getInterceptorMap() { - return interceptorMap; - } - - public void removeInterceptor(String id) { - MapInterceptor interceptor = interceptorMap.remove(id); - interceptors.remove(interceptor); - } - public boolean isWanReplicationEnabled() { if (wanReplicationPublisher == null || wanMergePolicy == null) { return false; @@ -309,6 +281,10 @@ public void decreaseInvalidationListenerCount() { public boolean isInvalidationEnabled() { return isMemberNearCacheInvalidationEnabled() || hasInvalidationListener(); } + + public InterceptorRegistry getInterceptorRegistry() { + return interceptorRegistry; + } } diff --git a/hazelcast/src/main/java/com/hazelcast/map/impl/MapServiceContextImpl.java b/hazelcast/src/main/java/com/hazelcast/map/impl/MapServiceContextImpl.java index c227b17a3aa0..1fddb4151893 100644 --- a/hazelcast/src/main/java/com/hazelcast/map/impl/MapServiceContextImpl.java +++ b/hazelcast/src/main/java/com/hazelcast/map/impl/MapServiceContextImpl.java @@ -349,7 +349,8 @@ public Data toData(Object object) { @Override public void interceptAfterGet(String mapName, Object value) { - List interceptors = getMapContainer(mapName).getInterceptors(); + MapContainer mapContainer = getMapContainer(mapName); + List interceptors = mapContainer.getInterceptorRegistry().getInterceptors(); if (!interceptors.isEmpty()) { value = toObject(value); for (MapInterceptor interceptor : interceptors) { @@ -360,7 +361,8 @@ public void interceptAfterGet(String mapName, Object value) { @Override public Object interceptPut(String mapName, Object oldValue, Object newValue) { - List interceptors = getMapContainer(mapName).getInterceptors(); + MapContainer mapContainer = getMapContainer(mapName); + List interceptors = mapContainer.getInterceptorRegistry().getInterceptors(); Object result = null; if (!interceptors.isEmpty()) { result = toObject(newValue); @@ -377,7 +379,8 @@ public Object interceptPut(String mapName, Object oldValue, Object newValue) { @Override public void interceptAfterPut(String mapName, Object newValue) { - List interceptors = getMapContainer(mapName).getInterceptors(); + MapContainer mapContainer = getMapContainer(mapName); + List interceptors = mapContainer.getInterceptorRegistry().getInterceptors(); if (!interceptors.isEmpty()) { newValue = toObject(newValue); for (MapInterceptor interceptor : interceptors) { @@ -388,7 +391,8 @@ public void interceptAfterPut(String mapName, Object newValue) { @Override public Object interceptRemove(String mapName, Object value) { - List interceptors = getMapContainer(mapName).getInterceptors(); + MapContainer mapContainer = getMapContainer(mapName); + List interceptors = mapContainer.getInterceptorRegistry().getInterceptors(); Object result = null; if (!interceptors.isEmpty()) { result = toObject(value); @@ -404,7 +408,9 @@ public Object interceptRemove(String mapName, Object value) { @Override public void interceptAfterRemove(String mapName, Object value) { - List interceptors = getMapContainer(mapName).getInterceptors(); + MapContainer mapContainer = getMapContainer(mapName); + InterceptorRegistry interceptorRegistry = mapContainer.getInterceptorRegistry(); + List interceptors = interceptorRegistry.getInterceptors(); if (!interceptors.isEmpty()) { for (MapInterceptor interceptor : interceptors) { value = toObject(value); @@ -416,7 +422,7 @@ public void interceptAfterRemove(String mapName, Object value) { @Override public void addInterceptor(String id, String mapName, MapInterceptor interceptor) { MapContainer mapContainer = getMapContainer(mapName); - mapContainer.addInterceptor(id, interceptor); + mapContainer.getInterceptorRegistry().register(id, interceptor); } @@ -427,13 +433,16 @@ public String generateInterceptorId(String mapName, MapInterceptor interceptor) @Override public void removeInterceptor(String mapName, String id) { - getMapContainer(mapName).removeInterceptor(id); + MapContainer mapContainer = getMapContainer(mapName); + mapContainer.getInterceptorRegistry().deregister(id); } // todo interceptors should get a wrapped object which includes the serialized version @Override public Object interceptGet(String mapName, Object value) { - List interceptors = getMapContainer(mapName).getInterceptors(); + MapContainer mapContainer = getMapContainer(mapName); + InterceptorRegistry interceptorRegistry = mapContainer.getInterceptorRegistry(); + List interceptors = interceptorRegistry.getInterceptors(); Object result = null; if (!interceptors.isEmpty()) { result = toObject(value); @@ -449,8 +458,8 @@ public Object interceptGet(String mapName, Object value) { @Override public boolean hasInterceptor(String mapName) { - List interceptors = getMapContainer(mapName).getInterceptors(); - return !interceptors.isEmpty(); + MapContainer mapContainer = getMapContainer(mapName); + return !mapContainer.getInterceptorRegistry().getInterceptors().isEmpty(); } @Override diff --git a/hazelcast/src/main/java/com/hazelcast/map/impl/operation/AddInterceptorOperation.java b/hazelcast/src/main/java/com/hazelcast/map/impl/operation/AddInterceptorOperation.java index 0086a3fcdde7..347154e312b2 100644 --- a/hazelcast/src/main/java/com/hazelcast/map/impl/operation/AddInterceptorOperation.java +++ b/hazelcast/src/main/java/com/hazelcast/map/impl/operation/AddInterceptorOperation.java @@ -22,8 +22,9 @@ import com.hazelcast.nio.ObjectDataInput; import com.hazelcast.nio.ObjectDataOutput; import com.hazelcast.spi.AbstractOperation; -import com.hazelcast.spi.impl.MutatingOperation; import com.hazelcast.spi.NamedOperation; +import com.hazelcast.spi.impl.MutatingOperation; + import java.io.IOException; public class AddInterceptorOperation extends AbstractOperation implements MutatingOperation, NamedOperation { @@ -51,7 +52,7 @@ public String getServiceName() { public void run() { mapService = getService(); MapContainer mapContainer = mapService.getMapServiceContext().getMapContainer(mapName); - mapContainer.addInterceptor(id, mapInterceptor); + mapContainer.getInterceptorRegistry().register(id, mapInterceptor); } @Override diff --git a/hazelcast/src/main/java/com/hazelcast/map/impl/operation/PostJoinMapOperation.java b/hazelcast/src/main/java/com/hazelcast/map/impl/operation/PostJoinMapOperation.java index b8d0ea1819d6..6170fd27dd53 100644 --- a/hazelcast/src/main/java/com/hazelcast/map/impl/operation/PostJoinMapOperation.java +++ b/hazelcast/src/main/java/com/hazelcast/map/impl/operation/PostJoinMapOperation.java @@ -17,6 +17,7 @@ package com.hazelcast.map.impl.operation; import com.hazelcast.map.MapInterceptor; +import com.hazelcast.map.impl.InterceptorRegistry; import com.hazelcast.map.impl.MapContainer; import com.hazelcast.map.impl.MapService; import com.hazelcast.map.impl.MapServiceContext; @@ -55,8 +56,9 @@ public void addMapIndex(MapContainer mapContainer) { } public void addMapInterceptors(MapContainer mapContainer) { - List interceptorList = mapContainer.getInterceptors(); - Map interceptorMap = mapContainer.getInterceptorMap(); + InterceptorRegistry interceptorRegistry = mapContainer.getInterceptorRegistry(); + List interceptorList = interceptorRegistry.getInterceptors(); + Map interceptorMap = interceptorRegistry.getId2InterceptorMap(); Map revMap = new HashMap(); for (Map.Entry entry : interceptorMap.entrySet()) { revMap.put(entry.getValue(), entry.getKey()); @@ -119,11 +121,12 @@ public void run() throws Exception { } for (InterceptorInfo interceptorInfo : interceptorInfoList) { final MapContainer mapContainer = mapServiceContext.getMapContainer(interceptorInfo.mapName); - Map interceptorMap = mapContainer.getInterceptorMap(); + InterceptorRegistry interceptorRegistry = mapContainer.getInterceptorRegistry(); + Map interceptorMap = interceptorRegistry.getId2InterceptorMap(); List> entryList = interceptorInfo.interceptors; for (Map.Entry entry : entryList) { if (!interceptorMap.containsKey(entry.getKey())) { - mapContainer.addInterceptor(entry.getKey(), entry.getValue()); + interceptorRegistry.register(entry.getKey(), entry.getValue()); } } } diff --git a/hazelcast/src/main/java/com/hazelcast/map/impl/operation/RemoveInterceptorOperation.java b/hazelcast/src/main/java/com/hazelcast/map/impl/operation/RemoveInterceptorOperation.java index f733f931a050..29b9f2474340 100644 --- a/hazelcast/src/main/java/com/hazelcast/map/impl/operation/RemoveInterceptorOperation.java +++ b/hazelcast/src/main/java/com/hazelcast/map/impl/operation/RemoveInterceptorOperation.java @@ -45,7 +45,7 @@ public void run() { mapService = getService(); MapServiceContext mapServiceContext = mapService.getMapServiceContext(); MapContainer mapContainer = mapServiceContext.getMapContainer(mapName); - mapContainer.removeInterceptor(id); + mapContainer.getInterceptorRegistry().deregister(id); } @Override diff --git a/hazelcast/src/test/java/com/hazelcast/map/impl/InterceptorRegistryTest.java b/hazelcast/src/test/java/com/hazelcast/map/impl/InterceptorRegistryTest.java new file mode 100644 index 000000000000..700e2da90bb9 --- /dev/null +++ b/hazelcast/src/test/java/com/hazelcast/map/impl/InterceptorRegistryTest.java @@ -0,0 +1,155 @@ +/* + * Copyright (c) 2008-2016, Hazelcast, Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hazelcast.map.impl; + +import com.hazelcast.map.MapInterceptor; +import com.hazelcast.test.HazelcastParallelClassRunner; +import com.hazelcast.test.HazelcastTestSupport; +import com.hazelcast.test.annotation.NightlyTest; +import com.hazelcast.test.annotation.ParallelTest; +import com.hazelcast.test.annotation.QuickTest; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; + +import static junit.framework.TestCase.assertFalse; +import static junit.framework.TestCase.assertTrue; + +@RunWith(HazelcastParallelClassRunner.class) +@Category({QuickTest.class, ParallelTest.class}) +public class InterceptorRegistryTest extends HazelcastTestSupport { + + private final InterceptorRegistry registry = new InterceptorRegistry(); + + @Test + public void test_register() throws Exception { + TestMapInterceptor interceptor = new TestMapInterceptor(); + + registry.register(interceptor.id, interceptor); + + List interceptors = registry.getInterceptors(); + + assertTrue(interceptors.contains(interceptor)); + } + + @Test + public void test_deregister() throws Exception { + TestMapInterceptor interceptor = new TestMapInterceptor(); + + registry.register(interceptor.id, interceptor); + registry.deregister(interceptor.id); + + List interceptors = registry.getInterceptors(); + + assertFalse(interceptors.contains(interceptor)); + } + + + @Test + @Category(NightlyTest.class) + public void testInternalStructuresEmptied_after_concurrent_register_deregister() throws Exception { + final AtomicBoolean stop = new AtomicBoolean(false); + + List threads = new ArrayList(); + for (int i = 0; i < 10; i++) { + Thread thread = new Thread(new Runnable() { + @Override + public void run() { + TestMapInterceptor interceptor = new TestMapInterceptor(); + while (!stop.get()) { + registry.register(interceptor.id, interceptor); + registry.deregister(interceptor.id); + } + } + }); + thread.start(); + threads.add(thread); + } + + sleepSeconds(10); + stop.set(true); + + for (Thread thread : threads) { + thread.join(); + } + + // expecting internals empty. + assertTrue("Id2Interceptor map should be empty", registry.getId2InterceptorMap().isEmpty()); + assertTrue("Interceptor list should be empty", registry.getInterceptors().isEmpty()); + } + + + private static class TestMapInterceptor implements MapInterceptor { + + public final String id = TestMapInterceptor.class.toString(); + + @Override + public Object interceptGet(Object value) { + return null; + } + + @Override + public void afterGet(Object value) { + + } + + @Override + public Object interceptPut(Object oldValue, Object newValue) { + return null; + } + + @Override + public void afterPut(Object value) { + + } + + @Override + public Object interceptRemove(Object removedValue) { + return null; + } + + @Override + public void afterRemove(Object value) { + + } + + @Override + public int hashCode() { + return id != null ? id.hashCode() : 0; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (o == null || getClass() != o.getClass()) { + return false; + } + + TestMapInterceptor that = (TestMapInterceptor) o; + + return id != null ? id.equals(that.id) : that.id == null; + + } + } +} \ No newline at end of file