Skip to content

Commit

Permalink
Expire IMap key before locking
Browse files Browse the repository at this point in the history
Introduces SPI for services that need to
execute some code before locking
a key.

Fixes hazelcast#13272
  • Loading branch information
vbekiaris committed Apr 23, 2019
1 parent b92c1d8 commit 0a2d959
Show file tree
Hide file tree
Showing 7 changed files with 232 additions and 36 deletions.
Expand Up @@ -24,6 +24,7 @@
import com.hazelcast.nio.serialization.Data;
import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
import com.hazelcast.nio.serialization.impl.Versioned;
import com.hazelcast.spi.LockInterceptorService;
import com.hazelcast.spi.NamedOperation;
import com.hazelcast.spi.ObjectNamespace;
import com.hazelcast.spi.Operation;
Expand Down Expand Up @@ -122,6 +123,14 @@ protected final void setReferenceCallId(long refCallId) {
this.referenceCallId = refCallId;
}

protected final void interceptLockOperation() {
// if service is a LockInterceptorService, notify it a key is about to be locked
Object targetService = getNodeEngine().getService(namespace.getServiceName());
if (targetService instanceof LockInterceptorService) {
((LockInterceptorService) targetService).onBeforeLock(namespace.getObjectName(), key);
}
}

@Override
public String getServiceName() {
return LockServiceImpl.SERVICE_NAME;
Expand Down
Expand Up @@ -41,6 +41,7 @@ public LockBackupOperation(ObjectNamespace namespace, Data key, long threadId, l

@Override
public void run() throws Exception {
interceptLockOperation();
LockStoreImpl lockStore = getLockStore();
response = lockStore.lock(key, originalCallerUuid, threadId, getReferenceCallId(), leaseTime);
}
Expand Down
Expand Up @@ -45,6 +45,7 @@ public LockOperation(ObjectNamespace namespace, Data key, long threadId, long le

@Override
public void run() throws Exception {
interceptLockOperation();
final boolean lockResult = getLockStore().lock(key, getCallerUuid(), threadId, getReferenceCallId(), leaseTime);
response = lockResult;

Expand Down
14 changes: 13 additions & 1 deletion hazelcast/src/main/java/com/hazelcast/map/impl/MapService.java
Expand Up @@ -20,13 +20,16 @@
import com.hazelcast.core.DistributedObject;
import com.hazelcast.internal.cluster.ClusterStateListener;
import com.hazelcast.map.impl.event.MapEventPublishingService;
import com.hazelcast.map.impl.recordstore.RecordStore;
import com.hazelcast.monitor.LocalMapStats;
import com.hazelcast.nio.serialization.Data;
import com.hazelcast.spi.ClientAwareService;
import com.hazelcast.spi.DistributedObjectNamespace;
import com.hazelcast.spi.EventFilter;
import com.hazelcast.spi.EventPublishingService;
import com.hazelcast.spi.EventRegistration;
import com.hazelcast.spi.FragmentedMigrationAwareService;
import com.hazelcast.spi.LockInterceptorService;
import com.hazelcast.spi.ManagedService;
import com.hazelcast.spi.NodeEngine;
import com.hazelcast.spi.NotifiableEventListener;
Expand Down Expand Up @@ -75,7 +78,8 @@
public class MapService implements ManagedService, FragmentedMigrationAwareService,
TransactionalService, RemoteService, EventPublishingService<Object, ListenerAdapter>,
PostJoinAwareService, SplitBrainHandlerService, ReplicationSupportingService, StatisticsAwareService<LocalMapStats>,
PartitionAwareService, ClientAwareService, QuorumAwareService, NotifiableEventListener, ClusterStateListener {
PartitionAwareService, ClientAwareService, QuorumAwareService, NotifiableEventListener, ClusterStateListener,
LockInterceptorService<Data> {

public static final String SERVICE_NAME = "hz:impl:mapService";

Expand Down Expand Up @@ -247,6 +251,14 @@ public void onClusterStateChange(ClusterState newState) {
mapServiceContext.onClusterStateChange(newState);
}

@Override
public void onBeforeLock(String distributedObjectName, Data key) {
int partitionId = mapServiceContext.getNodeEngine().getPartitionService().getPartitionId(key);
RecordStore recordStore = mapServiceContext.getRecordStore(partitionId, distributedObjectName);
// we have no use for the return value, invoked just for the side-effects
recordStore.getRecordOrNull(key);
}

public static ObjectNamespace getObjectNamespace(String mapName) {
return new DistributedObjectNamespace(SERVICE_NAME, mapName);
}
Expand Down
@@ -0,0 +1,29 @@
/*
* Copyright (c) 2008-2019, Hazelcast, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hazelcast.spi;

/**
* Interface to be implemented by services that need to intercept lock operation
* for distributed objects it manages.
*
* @param <T> type of key
*/
public interface LockInterceptorService<T> {

void onBeforeLock(String distributedObjectName, T key);

}
Expand Up @@ -16,58 +16,50 @@

package com.hazelcast.map.impl.eviction;

import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;

import java.util.concurrent.TimeUnit;

import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

import com.hazelcast.config.Config;
import com.hazelcast.config.MapConfig;
import com.hazelcast.config.MaxSizeConfig;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IMap;
import com.hazelcast.test.HazelcastSerialClassRunner;
import com.hazelcast.test.HazelcastParallelClassRunner;
import com.hazelcast.test.HazelcastTestSupport;
import com.hazelcast.test.annotation.ParallelTest;
import com.hazelcast.test.annotation.QuickTest;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

import java.util.concurrent.TimeUnit;

@RunWith(HazelcastSerialClassRunner.class)
@Category({ QuickTest.class, ParallelTest.class })
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;

@RunWith(HazelcastParallelClassRunner.class)
@Category({QuickTest.class, ParallelTest.class})
public class ExpirationManagerTimeoutTest extends HazelcastTestSupport {

@Ignore("https://github.com/hazelcast/hazelcast/issues/13272")
@Test
public void afterShortExpirationEntryShouldBeAway() throws InterruptedException {
final String KEY = "key";

Config hConfig = new Config().setInstanceName("instance")
Config hConfig = new Config()
.addMapConfig(new MapConfig().setName("test")
.setMaxSizeConfig(new MaxSizeConfig(200, MaxSizeConfig.MaxSizePolicy.FREE_HEAP_SIZE))
.setTimeToLiveSeconds(20));
final HazelcastInstance node = createHazelcastInstance(hConfig);
try {
IMap<String, String> map = node.getMap("test");
/**
* after 1 second entry should be evicted
*/
// after 1 second entry should be evicted
map.put(KEY, "value", 1, TimeUnit.SECONDS);
/**
* short time after adding it to the map, all ok
*/
// short time after adding it to the map, all ok
map.lock(KEY);
Object object = map.get(KEY);
map.unlock(KEY);
assertNotNull(object);

Thread.sleep(1200);
/**
* More than one second after adding it, now it should be away
*/

// more than one second after adding it, now it should be away
map.lock(KEY);
object = map.get(KEY);
map.unlock(KEY);
Expand All @@ -77,33 +69,26 @@ public void afterShortExpirationEntryShouldBeAway() throws InterruptedException
}
}

@Ignore("https://github.com/hazelcast/hazelcast/issues/13272")
@Test
public void afterLongerExpirationEntryShouldBeAway() throws InterruptedException {
final String KEY = "key";

Config hConfig = new Config().setInstanceName("instance")
Config hConfig = new Config()
.addMapConfig(new MapConfig().setName("test")
.setMaxSizeConfig(new MaxSizeConfig(200, MaxSizeConfig.MaxSizePolicy.FREE_HEAP_SIZE))
.setTimeToLiveSeconds(20));
final HazelcastInstance node = createHazelcastInstance(hConfig);
try {
IMap<String, String> map = node.getMap("test");
/**
* after 1 second entry should be evicted
*/
// after 1 second entry should be evicted
map.put(KEY, "value", 1, TimeUnit.SECONDS);
/**
* short time after adding it to the map, all ok
*/
// short time after adding it to the map, all ok
map.lock(KEY);
Object object = map.get(KEY);
map.unlock(KEY);
assertNotNull(object);
Thread.sleep(3600);
/**
* More than one second after adding it, now it should be away
*/
// More than one second after adding it, now it should be away
map.lock(KEY);
object = map.get(KEY);
map.unlock(KEY);
Expand Down
@@ -0,0 +1,159 @@
/*
* Copyright (c) 2008-2019, Hazelcast, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hazelcast.spi;

import com.hazelcast.concurrent.lock.LockProxySupport;
import com.hazelcast.concurrent.lock.LockService;
import com.hazelcast.concurrent.lock.LockStoreInfo;
import com.hazelcast.config.Config;
import com.hazelcast.config.ServiceConfig;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.nio.serialization.Data;
import com.hazelcast.spi.serialization.SerializationService;
import com.hazelcast.test.HazelcastSerialClassRunner;
import com.hazelcast.test.HazelcastTestSupport;
import com.hazelcast.test.annotation.ParallelTest;
import com.hazelcast.test.annotation.QuickTest;
import com.hazelcast.util.ConstructorFunction;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;

import static com.hazelcast.util.ConcurrencyUtil.getOrPutIfAbsent;
import static org.junit.Assert.assertEquals;

@RunWith(HazelcastSerialClassRunner.class)
@Category({QuickTest.class, ParallelTest.class})
public class LockInterceptorServiceTest extends HazelcastTestSupport {

private static final ConcurrentMap<String, AtomicInteger> LOCK_COUNTER = new ConcurrentHashMap<>();

@Before
public void setup() {
LOCK_COUNTER.clear();
}

@Test
public void testLockInterceptorServiceIsConsulted() {
LockInterceptingService implementation = new LockInterceptingService(false);

testLockingInterceptor(implementation);

assertLockCount(1);
}

@Test
public void testObjectIsNotLocked_whenLockInterceptorThrowsException() {
LockInterceptingService implementation = new LockInterceptingService(true);

testLockingInterceptor(implementation);

assertLockCount(0);
}

private void testLockingInterceptor(LockInterceptingService implementation) {
Config config = new Config();
config.getServicesConfig().addServiceConfig(new ServiceConfig()
.setEnabled(true)
.setName(LockInterceptingService.SERVICE_NAME)
.setImplementation(implementation));

HazelcastInstance member = createHazelcastInstance(config);
NodeEngine nodeEngine = getNodeEngineImpl(member);
implementation.serializationService = getSerializationService(member);

LockProxySupport lockProxySupport = new LockProxySupport(
new DistributedObjectNamespace(LockInterceptingService.SERVICE_NAME, "test-object"), 10000);

for (int i = 0; i < 100; i++) {
try {
Data key = getSerializationService(member).toData("key" + i);
lockProxySupport.lock(nodeEngine, key);
} catch (RuntimeException e) {
ignore(e);
}
}
}

private void assertLockCount(int expectedCount) {
for (int i = 0; i < 100; i++) {
assertEquals(expectedCount, LOCK_COUNTER.get("key" + i).get());
}
}

public static class LockInterceptingService implements LockInterceptorService<Data>, ManagedService {

public static final String SERVICE_NAME = "test-lock-intercepting-service";

private final boolean throwException;
private volatile SerializationService serializationService;

public LockInterceptingService(boolean throwException) {
this.throwException = throwException;
}

@Override
public void onBeforeLock(String distributedObjectName, Data key) {
String stringKey = serializationService.toObject(key);
AtomicInteger counter = getOrPutIfAbsent(LOCK_COUNTER, stringKey, arg -> new AtomicInteger());
if (throwException) {
throw new RuntimeException("failed");
}
counter.getAndIncrement();
}

@Override
public void init(NodeEngine nodeEngine, Properties properties) {
final LockService lockService = nodeEngine.getSharedService(LockService.SERVICE_NAME);
if (lockService != null) {
lockService.registerLockStoreConstructor(SERVICE_NAME, new LockStoreInfoConstructor());
}
}

@Override
public void reset() {
}

@Override
public void shutdown(boolean terminate) {
}
}

public static class LockStoreInfoConstructor implements ConstructorFunction<ObjectNamespace, LockStoreInfo> {
@Override
public LockStoreInfo createNew(ObjectNamespace arg) {
return new LockStoreInfo() {
@Override
public int getBackupCount() {
return 0;
}

@Override
public int getAsyncBackupCount() {
return 0;
}
};
}
}

}

0 comments on commit 0a2d959

Please sign in to comment.