Skip to content

Commit

Permalink
Merge pull request #7531 from ahmetmircik/fix/3.7/interceptorAddRemov…
Browse files Browse the repository at this point in the history
…eLeak

Fixes possible leak caused by concurrent add/remove of IMap interceptors
  • Loading branch information
Ali committed Feb 17, 2016
2 parents c73bbb2 + bdf6fd9 commit 2cfcc5e
Show file tree
Hide file tree
Showing 7 changed files with 315 additions and 47 deletions.
@@ -0,0 +1,124 @@
/*
* Copyright (c) 2008-2016, Hazelcast, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hazelcast.map.impl;

import com.hazelcast.map.MapInterceptor;
import com.hazelcast.spi.impl.operationexecutor.classic.PartitionOperationThread;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
import static java.util.Collections.unmodifiableList;
import static java.util.Collections.unmodifiableMap;

/**
* Registry for all {@link MapInterceptor} of an {@code IMap}
*
* Interceptors are read mostly and this registry keeps all registered interceptor in an array to easily iterate on them.
* Other than that, synchronized blocks are used to prevent leaks when concurrently registering/de-registering interceptors.
* Keep in mind that all registration/de-registration operations are done in generic-operation-threads, in other words,
* synchronized methods are not used in partition-threads.
*
* This registry is created per map.
*/
public class InterceptorRegistry {

private volatile List<MapInterceptor> interceptors = emptyList();
private volatile Map<String, MapInterceptor> id2InterceptorMap = emptyMap();

/**
* Gives an instance of this registry according to generic-operation-thread count.
*/
public InterceptorRegistry() {
}

/**
* Returns all registered interceptors.
*
* This method is called by {@link PartitionOperationThread}
*
* @return all registered interceptors
* @see com.hazelcast.map.impl.recordstore.DefaultRecordStore#put
*/
public List<MapInterceptor> getInterceptors() {
return interceptors;
}

public Map<String, MapInterceptor> getId2InterceptorMap() {
return id2InterceptorMap;
}

/**
* Registers a {@link MapInterceptor} for the supplied `id`.
* If there is no registration associated with the `id`, registers interceptor,
* otherwise silently ignores registration.
*
* This method is called by {@link com.hazelcast.spi.impl.operationexecutor.classic.GenericOperationThread}
* when registering via {@link com.hazelcast.map.impl.operation.AddInterceptorOperation}
*
* @param id id of the interceptor
* @param interceptor supplied {@link MapInterceptor}
*/
public synchronized void register(String id, MapInterceptor interceptor) {
assert !(Thread.currentThread() instanceof PartitionOperationThread);

if (id2InterceptorMap.containsKey(id)) {
return;
}

Map<String, MapInterceptor> tmpMap = new HashMap<String, MapInterceptor>(id2InterceptorMap);
tmpMap.put(id, interceptor);

id2InterceptorMap = unmodifiableMap(tmpMap);

List<MapInterceptor> tmpInterceptors = new ArrayList<MapInterceptor>(interceptors);
tmpInterceptors.add(interceptor);

interceptors = unmodifiableList(tmpInterceptors);
}

/**
* De-registers {@link MapInterceptor} for the supplied `id`, if there is any.
*
* This method is called by {@link com.hazelcast.spi.impl.operationexecutor.classic.GenericOperationThread}
* when de-registering via {@link com.hazelcast.map.impl.operation.RemoveInterceptorOperation}
*
* @param id id of the interceptor
*/
public synchronized void deregister(String id) {
assert !(Thread.currentThread() instanceof PartitionOperationThread);

if (!id2InterceptorMap.containsKey(id)) {
return;
}

Map<String, MapInterceptor> tmpMap = new HashMap<String, MapInterceptor>(id2InterceptorMap);
MapInterceptor removedInterceptor = tmpMap.remove(id);

id2InterceptorMap = unmodifiableMap(tmpMap);

List<MapInterceptor> tmpInterceptors = new ArrayList<MapInterceptor>(interceptors);
tmpInterceptors.remove(removedInterceptor);

interceptors = unmodifiableList(tmpInterceptors);
}

}
36 changes: 6 additions & 30 deletions hazelcast/src/main/java/com/hazelcast/map/impl/MapContainer.java
Expand Up @@ -22,7 +22,6 @@
import com.hazelcast.core.IFunction;
import com.hazelcast.core.PartitioningStrategy;
import com.hazelcast.internal.serialization.SerializationService;
import com.hazelcast.map.MapInterceptor;
import com.hazelcast.map.impl.eviction.EvictionChecker;
import com.hazelcast.map.impl.eviction.EvictionCheckerImpl;
import com.hazelcast.map.impl.eviction.Evictor;
Expand All @@ -44,10 +43,6 @@
import com.hazelcast.wan.WanReplicationPublisher;
import com.hazelcast.wan.WanReplicationService;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;

import static com.hazelcast.map.impl.SizeEstimators.createNearCacheSizeEstimator;
Expand All @@ -61,15 +56,14 @@ public class MapContainer {
protected final String name;
protected final String quorumName;
protected final MapServiceContext mapServiceContext;
protected final Map<String, MapInterceptor> interceptorMap;
protected final Indexes indexes;
protected final Extractors extractors;
protected final SizeEstimator nearCacheSizeEstimator;
protected final PartitioningStrategy partitioningStrategy;
protected final MapStoreContext mapStoreContext;
protected final SerializationService serializationService;
protected final QueryEntryFactory queryEntryFactory;
protected final List<MapInterceptor> interceptors;
protected final InterceptorRegistry interceptorRegistry;
protected final IFunction<Object, Data> toDataFunction = new IFunction<Object, Data>() {
@Override
public Data apply(Object input) {
Expand Down Expand Up @@ -107,15 +101,14 @@ public MapContainer(final String name, final MapConfig mapConfig, final MapServi
this.recordFactoryConstructor = createRecordFactoryConstructor(serializationService);
this.queryEntryFactory = new QueryEntryFactory(mapConfig.getCacheDeserializedValues());
initWanReplication(nodeEngine);
this.interceptors = new CopyOnWriteArrayList<MapInterceptor>();
this.interceptorMap = new ConcurrentHashMap<String, MapInterceptor>();
this.nearCacheSizeEstimator = createNearCacheSizeEstimator(mapConfig.getNearCacheConfig());
this.mapStoreContext = createMapStoreContext(this);
this.mapStoreContext.start();
this.extractors = new Extractors(mapConfig.getMapAttributeConfigs());
this.indexes = new Indexes(serializationService, extractors);
this.evictor = createEvictor(mapServiceContext);
this.memberNearCacheInvalidationEnabled = isNearCacheEnabled() && mapConfig.getNearCacheConfig().isInvalidateOnChange();
this.interceptorRegistry = new InterceptorRegistry();
}

// this method is overridden.
Expand Down Expand Up @@ -183,27 +176,6 @@ public MapMergePolicy getWanMergePolicy() {
return wanMergePolicy;
}

public void addInterceptor(String id, MapInterceptor interceptor) {

removeInterceptor(id);

interceptorMap.put(id, interceptor);
interceptors.add(interceptor);
}

public List<MapInterceptor> getInterceptors() {
return interceptors;
}

public Map<String, MapInterceptor> getInterceptorMap() {
return interceptorMap;
}

public void removeInterceptor(String id) {
MapInterceptor interceptor = interceptorMap.remove(id);
interceptors.remove(interceptor);
}

public boolean isWanReplicationEnabled() {
if (wanReplicationPublisher == null || wanMergePolicy == null) {
return false;
Expand Down Expand Up @@ -309,6 +281,10 @@ public void decreaseInvalidationListenerCount() {
public boolean isInvalidationEnabled() {
return isMemberNearCacheInvalidationEnabled() || hasInvalidationListener();
}

public InterceptorRegistry getInterceptorRegistry() {
return interceptorRegistry;
}
}


Expand Up @@ -349,7 +349,8 @@ public Data toData(Object object) {

@Override
public void interceptAfterGet(String mapName, Object value) {
List<MapInterceptor> interceptors = getMapContainer(mapName).getInterceptors();
MapContainer mapContainer = getMapContainer(mapName);
List<MapInterceptor> interceptors = mapContainer.getInterceptorRegistry().getInterceptors();
if (!interceptors.isEmpty()) {
value = toObject(value);
for (MapInterceptor interceptor : interceptors) {
Expand All @@ -360,7 +361,8 @@ public void interceptAfterGet(String mapName, Object value) {

@Override
public Object interceptPut(String mapName, Object oldValue, Object newValue) {
List<MapInterceptor> interceptors = getMapContainer(mapName).getInterceptors();
MapContainer mapContainer = getMapContainer(mapName);
List<MapInterceptor> interceptors = mapContainer.getInterceptorRegistry().getInterceptors();
Object result = null;
if (!interceptors.isEmpty()) {
result = toObject(newValue);
Expand All @@ -377,7 +379,8 @@ public Object interceptPut(String mapName, Object oldValue, Object newValue) {

@Override
public void interceptAfterPut(String mapName, Object newValue) {
List<MapInterceptor> interceptors = getMapContainer(mapName).getInterceptors();
MapContainer mapContainer = getMapContainer(mapName);
List<MapInterceptor> interceptors = mapContainer.getInterceptorRegistry().getInterceptors();
if (!interceptors.isEmpty()) {
newValue = toObject(newValue);
for (MapInterceptor interceptor : interceptors) {
Expand All @@ -388,7 +391,8 @@ public void interceptAfterPut(String mapName, Object newValue) {

@Override
public Object interceptRemove(String mapName, Object value) {
List<MapInterceptor> interceptors = getMapContainer(mapName).getInterceptors();
MapContainer mapContainer = getMapContainer(mapName);
List<MapInterceptor> interceptors = mapContainer.getInterceptorRegistry().getInterceptors();
Object result = null;
if (!interceptors.isEmpty()) {
result = toObject(value);
Expand All @@ -404,7 +408,9 @@ public Object interceptRemove(String mapName, Object value) {

@Override
public void interceptAfterRemove(String mapName, Object value) {
List<MapInterceptor> interceptors = getMapContainer(mapName).getInterceptors();
MapContainer mapContainer = getMapContainer(mapName);
InterceptorRegistry interceptorRegistry = mapContainer.getInterceptorRegistry();
List<MapInterceptor> interceptors = interceptorRegistry.getInterceptors();
if (!interceptors.isEmpty()) {
for (MapInterceptor interceptor : interceptors) {
value = toObject(value);
Expand All @@ -416,7 +422,7 @@ public void interceptAfterRemove(String mapName, Object value) {
@Override
public void addInterceptor(String id, String mapName, MapInterceptor interceptor) {
MapContainer mapContainer = getMapContainer(mapName);
mapContainer.addInterceptor(id, interceptor);
mapContainer.getInterceptorRegistry().register(id, interceptor);
}


Expand All @@ -427,13 +433,16 @@ public String generateInterceptorId(String mapName, MapInterceptor interceptor)

@Override
public void removeInterceptor(String mapName, String id) {
getMapContainer(mapName).removeInterceptor(id);
MapContainer mapContainer = getMapContainer(mapName);
mapContainer.getInterceptorRegistry().deregister(id);
}

// todo interceptors should get a wrapped object which includes the serialized version
@Override
public Object interceptGet(String mapName, Object value) {
List<MapInterceptor> interceptors = getMapContainer(mapName).getInterceptors();
MapContainer mapContainer = getMapContainer(mapName);
InterceptorRegistry interceptorRegistry = mapContainer.getInterceptorRegistry();
List<MapInterceptor> interceptors = interceptorRegistry.getInterceptors();
Object result = null;
if (!interceptors.isEmpty()) {
result = toObject(value);
Expand All @@ -449,8 +458,8 @@ public Object interceptGet(String mapName, Object value) {

@Override
public boolean hasInterceptor(String mapName) {
List<MapInterceptor> interceptors = getMapContainer(mapName).getInterceptors();
return !interceptors.isEmpty();
MapContainer mapContainer = getMapContainer(mapName);
return !mapContainer.getInterceptorRegistry().getInterceptors().isEmpty();
}

@Override
Expand Down
Expand Up @@ -22,8 +22,9 @@
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
import com.hazelcast.spi.AbstractOperation;
import com.hazelcast.spi.impl.MutatingOperation;
import com.hazelcast.spi.NamedOperation;
import com.hazelcast.spi.impl.MutatingOperation;

import java.io.IOException;

public class AddInterceptorOperation extends AbstractOperation implements MutatingOperation, NamedOperation {
Expand Down Expand Up @@ -51,7 +52,7 @@ public String getServiceName() {
public void run() {
mapService = getService();
MapContainer mapContainer = mapService.getMapServiceContext().getMapContainer(mapName);
mapContainer.addInterceptor(id, mapInterceptor);
mapContainer.getInterceptorRegistry().register(id, mapInterceptor);
}

@Override
Expand Down
Expand Up @@ -17,6 +17,7 @@
package com.hazelcast.map.impl.operation;

import com.hazelcast.map.MapInterceptor;
import com.hazelcast.map.impl.InterceptorRegistry;
import com.hazelcast.map.impl.MapContainer;
import com.hazelcast.map.impl.MapService;
import com.hazelcast.map.impl.MapServiceContext;
Expand Down Expand Up @@ -55,8 +56,9 @@ public void addMapIndex(MapContainer mapContainer) {
}

public void addMapInterceptors(MapContainer mapContainer) {
List<MapInterceptor> interceptorList = mapContainer.getInterceptors();
Map<String, MapInterceptor> interceptorMap = mapContainer.getInterceptorMap();
InterceptorRegistry interceptorRegistry = mapContainer.getInterceptorRegistry();
List<MapInterceptor> interceptorList = interceptorRegistry.getInterceptors();
Map<String, MapInterceptor> interceptorMap = interceptorRegistry.getId2InterceptorMap();
Map<MapInterceptor, String> revMap = new HashMap<MapInterceptor, String>();
for (Map.Entry<String, MapInterceptor> entry : interceptorMap.entrySet()) {
revMap.put(entry.getValue(), entry.getKey());
Expand Down Expand Up @@ -119,11 +121,12 @@ public void run() throws Exception {
}
for (InterceptorInfo interceptorInfo : interceptorInfoList) {
final MapContainer mapContainer = mapServiceContext.getMapContainer(interceptorInfo.mapName);
Map<String, MapInterceptor> interceptorMap = mapContainer.getInterceptorMap();
InterceptorRegistry interceptorRegistry = mapContainer.getInterceptorRegistry();
Map<String, MapInterceptor> interceptorMap = interceptorRegistry.getId2InterceptorMap();
List<Map.Entry<String, MapInterceptor>> entryList = interceptorInfo.interceptors;
for (Map.Entry<String, MapInterceptor> entry : entryList) {
if (!interceptorMap.containsKey(entry.getKey())) {
mapContainer.addInterceptor(entry.getKey(), entry.getValue());
interceptorRegistry.register(entry.getKey(), entry.getValue());
}
}
}
Expand Down
Expand Up @@ -45,7 +45,7 @@ public void run() {
mapService = getService();
MapServiceContext mapServiceContext = mapService.getMapServiceContext();
MapContainer mapContainer = mapServiceContext.getMapContainer(mapName);
mapContainer.removeInterceptor(id);
mapContainer.getInterceptorRegistry().deregister(id);
}

@Override
Expand Down

0 comments on commit 2cfcc5e

Please sign in to comment.