From 0a2d9595e0e2a9654024f40f3d61fd00a838285c Mon Sep 17 00:00:00 2001 From: Vassilis Bekiaris Date: Thu, 11 Apr 2019 17:30:13 +0300 Subject: [PATCH] Expire IMap key before locking Introduces SPI for services that need to execute some code before locking a key. Fixes #13272 --- .../operations/AbstractLockOperation.java | 9 + .../lock/operations/LockBackupOperation.java | 1 + .../lock/operations/LockOperation.java | 1 + .../com/hazelcast/map/impl/MapService.java | 14 +- .../hazelcast/spi/LockInterceptorService.java | 29 ++++ .../ExpirationManagerTimeoutTest.java | 55 +++--- .../spi/LockInterceptorServiceTest.java | 159 ++++++++++++++++++ 7 files changed, 232 insertions(+), 36 deletions(-) create mode 100644 hazelcast/src/main/java/com/hazelcast/spi/LockInterceptorService.java create mode 100644 hazelcast/src/test/java/com/hazelcast/spi/LockInterceptorServiceTest.java diff --git a/hazelcast/src/main/java/com/hazelcast/concurrent/lock/operations/AbstractLockOperation.java b/hazelcast/src/main/java/com/hazelcast/concurrent/lock/operations/AbstractLockOperation.java index 96657a5246c8..600060fff797 100644 --- a/hazelcast/src/main/java/com/hazelcast/concurrent/lock/operations/AbstractLockOperation.java +++ b/hazelcast/src/main/java/com/hazelcast/concurrent/lock/operations/AbstractLockOperation.java @@ -24,6 +24,7 @@ import com.hazelcast.nio.serialization.Data; import com.hazelcast.nio.serialization.IdentifiedDataSerializable; import com.hazelcast.nio.serialization.impl.Versioned; +import com.hazelcast.spi.LockInterceptorService; import com.hazelcast.spi.NamedOperation; import com.hazelcast.spi.ObjectNamespace; import com.hazelcast.spi.Operation; @@ -122,6 +123,14 @@ protected final void setReferenceCallId(long refCallId) { this.referenceCallId = refCallId; } + protected final void interceptLockOperation() { + // if service is a LockInterceptorService, notify it a key is about to be locked + Object targetService = getNodeEngine().getService(namespace.getServiceName()); + if (targetService instanceof LockInterceptorService) { + ((LockInterceptorService) targetService).onBeforeLock(namespace.getObjectName(), key); + } + } + @Override public String getServiceName() { return LockServiceImpl.SERVICE_NAME; diff --git a/hazelcast/src/main/java/com/hazelcast/concurrent/lock/operations/LockBackupOperation.java b/hazelcast/src/main/java/com/hazelcast/concurrent/lock/operations/LockBackupOperation.java index c4e5920c75e5..c88f5860ac7e 100644 --- a/hazelcast/src/main/java/com/hazelcast/concurrent/lock/operations/LockBackupOperation.java +++ b/hazelcast/src/main/java/com/hazelcast/concurrent/lock/operations/LockBackupOperation.java @@ -41,6 +41,7 @@ public LockBackupOperation(ObjectNamespace namespace, Data key, long threadId, l @Override public void run() throws Exception { + interceptLockOperation(); LockStoreImpl lockStore = getLockStore(); response = lockStore.lock(key, originalCallerUuid, threadId, getReferenceCallId(), leaseTime); } diff --git a/hazelcast/src/main/java/com/hazelcast/concurrent/lock/operations/LockOperation.java b/hazelcast/src/main/java/com/hazelcast/concurrent/lock/operations/LockOperation.java index f421cc9aa92b..5437f306b2bd 100644 --- a/hazelcast/src/main/java/com/hazelcast/concurrent/lock/operations/LockOperation.java +++ b/hazelcast/src/main/java/com/hazelcast/concurrent/lock/operations/LockOperation.java @@ -45,6 +45,7 @@ public LockOperation(ObjectNamespace namespace, Data key, long threadId, long le @Override public void run() throws Exception { + interceptLockOperation(); final boolean lockResult = getLockStore().lock(key, getCallerUuid(), threadId, getReferenceCallId(), leaseTime); response = lockResult; diff --git a/hazelcast/src/main/java/com/hazelcast/map/impl/MapService.java b/hazelcast/src/main/java/com/hazelcast/map/impl/MapService.java index ce52bb7cd03f..0fe69070f87b 100644 --- a/hazelcast/src/main/java/com/hazelcast/map/impl/MapService.java +++ b/hazelcast/src/main/java/com/hazelcast/map/impl/MapService.java @@ -20,13 +20,16 @@ import com.hazelcast.core.DistributedObject; import com.hazelcast.internal.cluster.ClusterStateListener; import com.hazelcast.map.impl.event.MapEventPublishingService; +import com.hazelcast.map.impl.recordstore.RecordStore; import com.hazelcast.monitor.LocalMapStats; +import com.hazelcast.nio.serialization.Data; import com.hazelcast.spi.ClientAwareService; import com.hazelcast.spi.DistributedObjectNamespace; import com.hazelcast.spi.EventFilter; import com.hazelcast.spi.EventPublishingService; import com.hazelcast.spi.EventRegistration; import com.hazelcast.spi.FragmentedMigrationAwareService; +import com.hazelcast.spi.LockInterceptorService; import com.hazelcast.spi.ManagedService; import com.hazelcast.spi.NodeEngine; import com.hazelcast.spi.NotifiableEventListener; @@ -75,7 +78,8 @@ public class MapService implements ManagedService, FragmentedMigrationAwareService, TransactionalService, RemoteService, EventPublishingService, PostJoinAwareService, SplitBrainHandlerService, ReplicationSupportingService, StatisticsAwareService, - PartitionAwareService, ClientAwareService, QuorumAwareService, NotifiableEventListener, ClusterStateListener { + PartitionAwareService, ClientAwareService, QuorumAwareService, NotifiableEventListener, ClusterStateListener, + LockInterceptorService { public static final String SERVICE_NAME = "hz:impl:mapService"; @@ -247,6 +251,14 @@ public void onClusterStateChange(ClusterState newState) { mapServiceContext.onClusterStateChange(newState); } + @Override + public void onBeforeLock(String distributedObjectName, Data key) { + int partitionId = mapServiceContext.getNodeEngine().getPartitionService().getPartitionId(key); + RecordStore recordStore = mapServiceContext.getRecordStore(partitionId, distributedObjectName); + // we have no use for the return value, invoked just for the side-effects + recordStore.getRecordOrNull(key); + } + public static ObjectNamespace getObjectNamespace(String mapName) { return new DistributedObjectNamespace(SERVICE_NAME, mapName); } diff --git a/hazelcast/src/main/java/com/hazelcast/spi/LockInterceptorService.java b/hazelcast/src/main/java/com/hazelcast/spi/LockInterceptorService.java new file mode 100644 index 000000000000..06d88bf9e9b5 --- /dev/null +++ b/hazelcast/src/main/java/com/hazelcast/spi/LockInterceptorService.java @@ -0,0 +1,29 @@ +/* + * Copyright (c) 2008-2019, Hazelcast, Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hazelcast.spi; + +/** + * Interface to be implemented by services that need to intercept lock operation + * for distributed objects it manages. + * + * @param type of key + */ +public interface LockInterceptorService { + + void onBeforeLock(String distributedObjectName, T key); + +} diff --git a/hazelcast/src/test/java/com/hazelcast/map/impl/eviction/ExpirationManagerTimeoutTest.java b/hazelcast/src/test/java/com/hazelcast/map/impl/eviction/ExpirationManagerTimeoutTest.java index c60db8a4c0e0..97790fef1777 100644 --- a/hazelcast/src/test/java/com/hazelcast/map/impl/eviction/ExpirationManagerTimeoutTest.java +++ b/hazelcast/src/test/java/com/hazelcast/map/impl/eviction/ExpirationManagerTimeoutTest.java @@ -16,58 +16,50 @@ package com.hazelcast.map.impl.eviction; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; - -import java.util.concurrent.TimeUnit; - -import org.junit.Ignore; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.runner.RunWith; - import com.hazelcast.config.Config; import com.hazelcast.config.MapConfig; import com.hazelcast.config.MaxSizeConfig; import com.hazelcast.core.HazelcastInstance; import com.hazelcast.core.IMap; -import com.hazelcast.test.HazelcastSerialClassRunner; +import com.hazelcast.test.HazelcastParallelClassRunner; import com.hazelcast.test.HazelcastTestSupport; import com.hazelcast.test.annotation.ParallelTest; import com.hazelcast.test.annotation.QuickTest; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.util.concurrent.TimeUnit; -@RunWith(HazelcastSerialClassRunner.class) -@Category({ QuickTest.class, ParallelTest.class }) +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + +@RunWith(HazelcastParallelClassRunner.class) +@Category({QuickTest.class, ParallelTest.class}) public class ExpirationManagerTimeoutTest extends HazelcastTestSupport { - @Ignore("https://github.com/hazelcast/hazelcast/issues/13272") @Test public void afterShortExpirationEntryShouldBeAway() throws InterruptedException { final String KEY = "key"; - Config hConfig = new Config().setInstanceName("instance") + Config hConfig = new Config() .addMapConfig(new MapConfig().setName("test") .setMaxSizeConfig(new MaxSizeConfig(200, MaxSizeConfig.MaxSizePolicy.FREE_HEAP_SIZE)) .setTimeToLiveSeconds(20)); final HazelcastInstance node = createHazelcastInstance(hConfig); try { IMap map = node.getMap("test"); - /** - * after 1 second entry should be evicted - */ + // after 1 second entry should be evicted map.put(KEY, "value", 1, TimeUnit.SECONDS); - /** - * short time after adding it to the map, all ok - */ + // short time after adding it to the map, all ok map.lock(KEY); Object object = map.get(KEY); map.unlock(KEY); assertNotNull(object); Thread.sleep(1200); - /** - * More than one second after adding it, now it should be away - */ + + // more than one second after adding it, now it should be away map.lock(KEY); object = map.get(KEY); map.unlock(KEY); @@ -77,33 +69,26 @@ public void afterShortExpirationEntryShouldBeAway() throws InterruptedException } } - @Ignore("https://github.com/hazelcast/hazelcast/issues/13272") @Test public void afterLongerExpirationEntryShouldBeAway() throws InterruptedException { final String KEY = "key"; - Config hConfig = new Config().setInstanceName("instance") + Config hConfig = new Config() .addMapConfig(new MapConfig().setName("test") .setMaxSizeConfig(new MaxSizeConfig(200, MaxSizeConfig.MaxSizePolicy.FREE_HEAP_SIZE)) .setTimeToLiveSeconds(20)); final HazelcastInstance node = createHazelcastInstance(hConfig); try { IMap map = node.getMap("test"); - /** - * after 1 second entry should be evicted - */ + // after 1 second entry should be evicted map.put(KEY, "value", 1, TimeUnit.SECONDS); - /** - * short time after adding it to the map, all ok - */ + // short time after adding it to the map, all ok map.lock(KEY); Object object = map.get(KEY); map.unlock(KEY); assertNotNull(object); Thread.sleep(3600); - /** - * More than one second after adding it, now it should be away - */ + // More than one second after adding it, now it should be away map.lock(KEY); object = map.get(KEY); map.unlock(KEY); diff --git a/hazelcast/src/test/java/com/hazelcast/spi/LockInterceptorServiceTest.java b/hazelcast/src/test/java/com/hazelcast/spi/LockInterceptorServiceTest.java new file mode 100644 index 000000000000..68d37a2059ae --- /dev/null +++ b/hazelcast/src/test/java/com/hazelcast/spi/LockInterceptorServiceTest.java @@ -0,0 +1,159 @@ +/* + * Copyright (c) 2008-2019, Hazelcast, Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hazelcast.spi; + +import com.hazelcast.concurrent.lock.LockProxySupport; +import com.hazelcast.concurrent.lock.LockService; +import com.hazelcast.concurrent.lock.LockStoreInfo; +import com.hazelcast.config.Config; +import com.hazelcast.config.ServiceConfig; +import com.hazelcast.core.HazelcastInstance; +import com.hazelcast.nio.serialization.Data; +import com.hazelcast.spi.serialization.SerializationService; +import com.hazelcast.test.HazelcastSerialClassRunner; +import com.hazelcast.test.HazelcastTestSupport; +import com.hazelcast.test.annotation.ParallelTest; +import com.hazelcast.test.annotation.QuickTest; +import com.hazelcast.util.ConstructorFunction; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicInteger; + +import static com.hazelcast.util.ConcurrencyUtil.getOrPutIfAbsent; +import static org.junit.Assert.assertEquals; + +@RunWith(HazelcastSerialClassRunner.class) +@Category({QuickTest.class, ParallelTest.class}) +public class LockInterceptorServiceTest extends HazelcastTestSupport { + + private static final ConcurrentMap LOCK_COUNTER = new ConcurrentHashMap<>(); + + @Before + public void setup() { + LOCK_COUNTER.clear(); + } + + @Test + public void testLockInterceptorServiceIsConsulted() { + LockInterceptingService implementation = new LockInterceptingService(false); + + testLockingInterceptor(implementation); + + assertLockCount(1); + } + + @Test + public void testObjectIsNotLocked_whenLockInterceptorThrowsException() { + LockInterceptingService implementation = new LockInterceptingService(true); + + testLockingInterceptor(implementation); + + assertLockCount(0); + } + + private void testLockingInterceptor(LockInterceptingService implementation) { + Config config = new Config(); + config.getServicesConfig().addServiceConfig(new ServiceConfig() + .setEnabled(true) + .setName(LockInterceptingService.SERVICE_NAME) + .setImplementation(implementation)); + + HazelcastInstance member = createHazelcastInstance(config); + NodeEngine nodeEngine = getNodeEngineImpl(member); + implementation.serializationService = getSerializationService(member); + + LockProxySupport lockProxySupport = new LockProxySupport( + new DistributedObjectNamespace(LockInterceptingService.SERVICE_NAME, "test-object"), 10000); + + for (int i = 0; i < 100; i++) { + try { + Data key = getSerializationService(member).toData("key" + i); + lockProxySupport.lock(nodeEngine, key); + } catch (RuntimeException e) { + ignore(e); + } + } + } + + private void assertLockCount(int expectedCount) { + for (int i = 0; i < 100; i++) { + assertEquals(expectedCount, LOCK_COUNTER.get("key" + i).get()); + } + } + + public static class LockInterceptingService implements LockInterceptorService, ManagedService { + + public static final String SERVICE_NAME = "test-lock-intercepting-service"; + + private final boolean throwException; + private volatile SerializationService serializationService; + + public LockInterceptingService(boolean throwException) { + this.throwException = throwException; + } + + @Override + public void onBeforeLock(String distributedObjectName, Data key) { + String stringKey = serializationService.toObject(key); + AtomicInteger counter = getOrPutIfAbsent(LOCK_COUNTER, stringKey, arg -> new AtomicInteger()); + if (throwException) { + throw new RuntimeException("failed"); + } + counter.getAndIncrement(); + } + + @Override + public void init(NodeEngine nodeEngine, Properties properties) { + final LockService lockService = nodeEngine.getSharedService(LockService.SERVICE_NAME); + if (lockService != null) { + lockService.registerLockStoreConstructor(SERVICE_NAME, new LockStoreInfoConstructor()); + } + } + + @Override + public void reset() { + } + + @Override + public void shutdown(boolean terminate) { + } + } + + public static class LockStoreInfoConstructor implements ConstructorFunction { + @Override + public LockStoreInfo createNew(ObjectNamespace arg) { + return new LockStoreInfo() { + @Override + public int getBackupCount() { + return 0; + } + + @Override + public int getAsyncBackupCount() { + return 0; + } + }; + } + } + +}