Skip to content

Commit

Permalink
implement quorum for locks and fixing missing config options
Browse files Browse the repository at this point in the history
  • Loading branch information
Matko Medenjak committed Nov 30, 2016
1 parent ac996f6 commit 15973e3
Show file tree
Hide file tree
Showing 37 changed files with 1,086 additions and 179 deletions.
Expand Up @@ -41,6 +41,7 @@
import com.hazelcast.config.JoinConfig;
import com.hazelcast.config.ListConfig;
import com.hazelcast.config.ListenerConfig;
import com.hazelcast.config.LockConfig;
import com.hazelcast.config.LoginModuleConfig;
import com.hazelcast.config.ManagementCenterConfig;
import com.hazelcast.config.MapAttributeConfig;
Expand Down Expand Up @@ -150,6 +151,7 @@ private class SpringXmlConfigBuilder extends SpringXmlBuilderHelper {
private ManagedMap mapConfigManagedMap;
private ManagedMap cacheConfigManagedMap;
private ManagedMap queueManagedMap;
private ManagedMap lockManagedMap;
private ManagedMap ringbufferManagedMap;
private ManagedMap reliableTopicManagedMap;
private ManagedMap semaphoreManagedMap;
Expand All @@ -170,6 +172,7 @@ public SpringXmlConfigBuilder(ParserContext parserContext) {
this.mapConfigManagedMap = createManagedMap("mapConfigs");
this.cacheConfigManagedMap = createManagedMap("cacheConfigs");
this.queueManagedMap = createManagedMap("queueConfigs");
this.lockManagedMap = createManagedMap("lockConfigs");
this.ringbufferManagedMap = createManagedMap("ringbufferConfigs");
this.reliableTopicManagedMap = createManagedMap("reliableTopicConfigs");
this.semaphoreManagedMap = createManagedMap("semaphoreConfigs");
Expand Down Expand Up @@ -212,6 +215,8 @@ public void handleConfig(final Element element) {
handleDurableExecutor(node);
} else if ("queue".equals(nodeName)) {
handleQueue(node);
} else if ("lock".equals(nodeName)) {
handleLock(node);
} else if ("ringbuffer".equals(nodeName)) {
handleRingbuffer(node);
} else if ("reliable-topic".equals(nodeName)) {
Expand Down Expand Up @@ -586,6 +591,18 @@ public void handleSemaphore(Node node) {
semaphoreManagedMap.put(getAttribute(node, "name"), builder.getBeanDefinition());
}

public void handleLock(Node node) {
final BeanDefinitionBuilder lockConfigBuilder = createBeanBuilder(LockConfig.class);
fillAttributeValues(node, lockConfigBuilder);
for (Node childNode : childElements(node)) {
final String nodeName = cleanNodeName(childNode);
if ("quorum-ref".equals(nodeName)) {
lockConfigBuilder.addPropertyValue("quorumName", getTextContent(childNode));
}
}
lockManagedMap.put(getAttribute(node, "name"), lockConfigBuilder.getBeanDefinition());
}

public void handleRingbuffer(Node node) {
final BeanDefinitionBuilder ringbufferConfigBuilder = createBeanBuilder(RingbufferConfig.class);
fillAttributeValues(node, ringbufferConfigBuilder);
Expand Down
8 changes: 8 additions & 0 deletions hazelcast-spring/src/main/resources/hazelcast-spring-3.8.xsd
Expand Up @@ -162,6 +162,14 @@
</xs:attribute>
</xs:complexType>
</xs:element>
<xs:element name="lock" minOccurs="0" maxOccurs="unbounded">
<xs:complexType>
<xs:sequence>
<xs:element name="quorum-ref" minOccurs="0" maxOccurs="1"/>
</xs:sequence>
<xs:attribute name="name" use="required" type="xs:string"/>
</xs:complexType>
</xs:element>
<xs:element name="ringbuffer" minOccurs="0" maxOccurs="unbounded">
<xs:complexType>
<xs:sequence>
Expand Down
Expand Up @@ -33,6 +33,7 @@
import com.hazelcast.config.ItemListenerConfig;
import com.hazelcast.config.ListConfig;
import com.hazelcast.config.ListenerConfig;
import com.hazelcast.config.LockConfig;
import com.hazelcast.config.ManagementCenterConfig;
import com.hazelcast.config.MapAttributeConfig;
import com.hazelcast.config.MapConfig;
Expand Down Expand Up @@ -389,7 +390,7 @@ public void testMapConfig() {

@Test
public void testQueueConfig() {
QueueConfig testQConfig = config.getQueueConfig("testQ");
final QueueConfig testQConfig = config.getQueueConfig("testQ");
assertNotNull(testQConfig);
assertEquals("testQ", testQConfig.getName());
assertEquals(1000, testQConfig.getMaxSize());
Expand All @@ -398,12 +399,14 @@ public void testQueueConfig() {
ItemListenerConfig listenerConfig = testQConfig.getItemListenerConfigs().get(0);
assertEquals("com.hazelcast.spring.DummyItemListener", listenerConfig.getClassName());
assertTrue(listenerConfig.isIncludeValue());
QueueConfig qConfig = config.getQueueConfig("q");

final QueueConfig qConfig = config.getQueueConfig("q");
assertNotNull(qConfig);
assertEquals("q", qConfig.getName());
assertEquals(2500, qConfig.getMaxSize());
assertFalse(qConfig.isStatisticsEnabled());
assertEquals(100, qConfig.getEmptyQueueTtl());
assertEquals("my-quorum", qConfig.getQuorumName());

final QueueConfig queueWithStore1 = config.getQueueConfig("queueWithStore1");
assertNotNull(queueWithStore1);
Expand All @@ -430,6 +433,14 @@ public void testQueueConfig() {
assertEquals(dummyQueueStoreFactory, storeConfig4.getFactoryImplementation());
}

@Test
public void testLockConfig() {
final LockConfig lockConfig = config.getLockConfig("lock");
assertNotNull(lockConfig);
assertEquals("lock", lockConfig.getName());
assertEquals("my-quorum", lockConfig.getQuorumName());
}

@Test
public void testRingbufferConfig() {
final RingbufferConfig testRingbuffer = config.getRingbufferConfig("testRingbuffer");
Expand Down
Expand Up @@ -152,10 +152,7 @@
queue-capacity="300"
statistics-enabled="false"
/>
<hz:queue name="testQ"
max-size="1000"
>

<hz:queue name="testQ" max-size="1000">
<hz:item-listeners>
<hz:item-listener class-name="com.hazelcast.spring.DummyItemListener" include-value="true"/>
</hz:item-listeners>
Expand All @@ -165,7 +162,9 @@
backup-count="1"
async-backup-count="1"
statistics-enabled="false"
empty-queue-ttl="100"/>
empty-queue-ttl="100">
<hz:quorum-ref>my-quorum</hz:quorum-ref>
</hz:queue>

<hz:queue name="queueWithStore1">
<hz:queue-store enabled="true" class-name="com.hazelcast.spring.DummyQueueStore"/>
Expand All @@ -180,6 +179,10 @@
<hz:queue-store enabled="true" factory-implementation="dummyQueueStoreFactory"/>
</hz:queue>

<hz:lock name="lock">
<hz:quorum-ref>my-quorum</hz:quorum-ref>
</hz:lock>

<hz:ringbuffer name="testRingbuffer"
in-memory-format="OBJECT"
capacity="100"
Expand Down
Expand Up @@ -19,6 +19,7 @@
import com.hazelcast.concurrent.lock.operations.LocalLockCleanupOperation;
import com.hazelcast.concurrent.lock.operations.LockReplicationOperation;
import com.hazelcast.concurrent.lock.operations.UnlockOperation;
import com.hazelcast.config.LockConfig;
import com.hazelcast.core.DistributedObject;
import com.hazelcast.instance.MemberImpl;
import com.hazelcast.nio.serialization.Data;
Expand All @@ -34,6 +35,7 @@
import com.hazelcast.spi.OperationService;
import com.hazelcast.spi.PartitionMigrationEvent;
import com.hazelcast.spi.PartitionReplicationEvent;
import com.hazelcast.spi.QuorumAwareService;
import com.hazelcast.spi.RemoteService;
import com.hazelcast.spi.impl.PartitionSpecificRunnable;
import com.hazelcast.spi.impl.operationservice.InternalOperationService;
Expand All @@ -42,6 +44,7 @@
import com.hazelcast.spi.properties.HazelcastProperties;
import com.hazelcast.util.Clock;
import com.hazelcast.util.ConstructorFunction;
import com.hazelcast.util.ContextMutexFactory;

import java.util.Collection;
import java.util.LinkedList;
Expand All @@ -50,16 +53,18 @@
import java.util.concurrent.ConcurrentMap;

import static com.hazelcast.spi.impl.OperationResponseHandlerFactory.createEmptyResponseHandler;
import static com.hazelcast.util.ConcurrencyUtil.getOrPutSynchronized;

@SuppressWarnings("checkstyle:methodcount")
public final class LockServiceImpl implements LockService, ManagedService, RemoteService, MembershipAwareService,
MigrationAwareService, ClientAwareService {
MigrationAwareService, ClientAwareService, QuorumAwareService {

private final NodeEngine nodeEngine;
private final LockStoreContainer[] containers;
private final ConcurrentMap<String, ConstructorFunction<ObjectNamespace, LockStoreInfo>> constructors
= new ConcurrentHashMap<String, ConstructorFunction<ObjectNamespace, LockStoreInfo>>();

private final ConcurrentMap<String, String> quorumConfigCache = new ConcurrentHashMap<String, String>();
private final ContextMutexFactory quorumConfigCacheMutexFactory = new ContextMutexFactory();
private final long maxLeaseTimeInMillis;

public LockServiceImpl(NodeEngine nodeEngine) {
Expand Down Expand Up @@ -310,4 +315,17 @@ public void clientDisconnected(String clientUuid) {
public static long getMaxLeaseTimeInMillis(HazelcastProperties hazelcastProperties) {
return hazelcastProperties.getMillis(GroupProperty.LOCK_MAX_LEASE_TIME_SECONDS);
}

@Override
public String getQuorumName(final String name) {
// we use caching here because lock operations are often and we should avoid lock config lookup
return getOrPutSynchronized(quorumConfigCache, name, quorumConfigCacheMutexFactory,
new ConstructorFunction<String, String>() {
@Override
public String createNew(String arg) {
final LockConfig lockConfig = nodeEngine.getConfig().findLockConfig(name);
return lockConfig.getQuorumName();
}
});
}
}
Expand Up @@ -23,6 +23,7 @@
import com.hazelcast.nio.ObjectDataOutput;
import com.hazelcast.nio.serialization.Data;
import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
import com.hazelcast.spi.NamedOperation;
import com.hazelcast.spi.ObjectNamespace;
import com.hazelcast.spi.Operation;
import com.hazelcast.spi.PartitionAwareOperation;
Expand All @@ -31,7 +32,7 @@
import java.util.concurrent.atomic.AtomicLongFieldUpdater;

public abstract class AbstractLockOperation extends Operation
implements PartitionAwareOperation, IdentifiedDataSerializable {
implements PartitionAwareOperation, IdentifiedDataSerializable, NamedOperation {

public static final int ANY_THREAD = 0;

Expand Down Expand Up @@ -123,6 +124,11 @@ public final Data getKey() {
return key;
}

@Override
public String getName() {
return namespace.getObjectName();
}

@Override
public final int getFactoryId() {
return LockDataSerializerHook.F_ID;
Expand Down
Expand Up @@ -24,11 +24,12 @@
import com.hazelcast.nio.serialization.Data;
import com.hazelcast.spi.BackupOperation;
import com.hazelcast.spi.ObjectNamespace;
import com.hazelcast.spi.impl.MutatingOperation;

import java.io.IOException;

public class AwaitBackupOperation extends AbstractLockOperation
implements BackupOperation {
implements BackupOperation, MutatingOperation {

private String originalCaller;
private String conditionId;
Expand Down
Expand Up @@ -26,11 +26,12 @@
import com.hazelcast.spi.BlockingOperation;
import com.hazelcast.spi.ObjectNamespace;
import com.hazelcast.spi.Operation;
import com.hazelcast.spi.impl.MutatingOperation;

import java.io.IOException;

public class AwaitOperation extends AbstractLockOperation
implements BlockingOperation, BackupAwareOperation {
implements BlockingOperation, BackupAwareOperation, MutatingOperation {

private String conditionId;
private boolean expired;
Expand Down
Expand Up @@ -23,10 +23,11 @@
import com.hazelcast.nio.serialization.Data;
import com.hazelcast.spi.BackupOperation;
import com.hazelcast.spi.ObjectNamespace;
import com.hazelcast.spi.impl.MutatingOperation;

import java.io.IOException;

public class BeforeAwaitBackupOperation extends AbstractLockOperation implements BackupOperation {
public class BeforeAwaitBackupOperation extends AbstractLockOperation implements BackupOperation, MutatingOperation {

private String conditionId;
private String originalCaller;
Expand Down
Expand Up @@ -26,10 +26,11 @@
import com.hazelcast.spi.ObjectNamespace;
import com.hazelcast.spi.Operation;
import com.hazelcast.spi.WaitNotifyKey;
import com.hazelcast.spi.impl.MutatingOperation;

import java.io.IOException;

public class BeforeAwaitOperation extends AbstractLockOperation implements Notifier, BackupAwareOperation {
public class BeforeAwaitOperation extends AbstractLockOperation implements Notifier, BackupAwareOperation, MutatingOperation {

private String conditionId;

Expand Down
Expand Up @@ -20,8 +20,9 @@
import com.hazelcast.concurrent.lock.LockStoreImpl;
import com.hazelcast.nio.serialization.Data;
import com.hazelcast.spi.ObjectNamespace;
import com.hazelcast.spi.ReadonlyOperation;

public class GetLockCountOperation extends AbstractLockOperation {
public class GetLockCountOperation extends AbstractLockOperation implements ReadonlyOperation {

public GetLockCountOperation() {
}
Expand Down
Expand Up @@ -20,8 +20,9 @@
import com.hazelcast.concurrent.lock.LockStoreImpl;
import com.hazelcast.nio.serialization.Data;
import com.hazelcast.spi.ObjectNamespace;
import com.hazelcast.spi.ReadonlyOperation;

public class GetRemainingLeaseTimeOperation extends AbstractLockOperation {
public class GetRemainingLeaseTimeOperation extends AbstractLockOperation implements ReadonlyOperation {

public GetRemainingLeaseTimeOperation() {
}
Expand Down
Expand Up @@ -20,8 +20,9 @@
import com.hazelcast.concurrent.lock.LockStoreImpl;
import com.hazelcast.nio.serialization.Data;
import com.hazelcast.spi.ObjectNamespace;
import com.hazelcast.spi.ReadonlyOperation;

public class IsLockedOperation extends AbstractLockOperation {
public class IsLockedOperation extends AbstractLockOperation implements ReadonlyOperation {

public IsLockedOperation() {
}
Expand Down
Expand Up @@ -23,10 +23,11 @@
import com.hazelcast.nio.serialization.Data;
import com.hazelcast.spi.BackupOperation;
import com.hazelcast.spi.ObjectNamespace;
import com.hazelcast.spi.impl.MutatingOperation;

import java.io.IOException;

public class LockBackupOperation extends AbstractLockOperation implements BackupOperation {
public class LockBackupOperation extends AbstractLockOperation implements BackupOperation, MutatingOperation {

private String originalCallerUuid;

Expand Down
Expand Up @@ -26,8 +26,9 @@
import com.hazelcast.spi.ObjectNamespace;
import com.hazelcast.spi.Operation;
import com.hazelcast.spi.WaitNotifyKey;
import com.hazelcast.spi.impl.MutatingOperation;

public class LockOperation extends AbstractLockOperation implements BlockingOperation, BackupAwareOperation {
public class LockOperation extends AbstractLockOperation implements BlockingOperation, BackupAwareOperation, MutatingOperation {

public LockOperation() {
}
Expand Down
Expand Up @@ -20,8 +20,9 @@
import com.hazelcast.nio.serialization.Data;
import com.hazelcast.spi.BackupOperation;
import com.hazelcast.spi.ObjectNamespace;
import com.hazelcast.spi.impl.MutatingOperation;

public class SignalBackupOperation extends BaseSignalOperation implements BackupOperation {
public class SignalBackupOperation extends BaseSignalOperation implements BackupOperation, MutatingOperation {

public SignalBackupOperation() {
}
Expand Down
Expand Up @@ -22,8 +22,9 @@
import com.hazelcast.spi.BackupAwareOperation;
import com.hazelcast.spi.ObjectNamespace;
import com.hazelcast.spi.Operation;
import com.hazelcast.spi.impl.MutatingOperation;

public class SignalOperation extends BaseSignalOperation implements BackupAwareOperation {
public class SignalOperation extends BaseSignalOperation implements BackupAwareOperation, MutatingOperation {

public SignalOperation() {
}
Expand Down
Expand Up @@ -23,10 +23,11 @@
import com.hazelcast.nio.serialization.Data;
import com.hazelcast.spi.BackupOperation;
import com.hazelcast.spi.ObjectNamespace;
import com.hazelcast.spi.impl.MutatingOperation;

import java.io.IOException;

public class UnlockBackupOperation extends AbstractLockOperation implements BackupOperation {
public class UnlockBackupOperation extends AbstractLockOperation implements BackupOperation, MutatingOperation {

private boolean force;
private String originalCallerUuid;
Expand Down

0 comments on commit 15973e3

Please sign in to comment.