Skip to content

Commit

Permalink
Implement client Lock idempotence (#8455)
Browse files Browse the repository at this point in the history
* Added lockReferenceId support for Lock, Map, Condition and Multimap.

* Updated the compatibility tests.

* Changed the client protocol version to SNAPSHOT version.

* Added the sonatype snapshot repository to the repositories.

* use reference id for the Condition await and beforeAwait.

Also added a sleep to ConditionAbstractTest so that all threads are guaraneteed to be in await. This is not the best way to do it but the ony way I can think of for now.

* Updated the binary file generator name.

* Removed the retry control for unlock operation. It is not possible to do this check for unlock operation.
  • Loading branch information
ihsandemir committed Jul 21, 2016
1 parent e7c3122 commit 2f6ab03
Show file tree
Hide file tree
Showing 35 changed files with 8,885 additions and 6,337 deletions.
@@ -0,0 +1,39 @@
/*
* Copyright (c) 2008-2016, Hazelcast, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hazelcast.client.impl;

import java.util.concurrent.atomic.AtomicLong;

/**
* This class generates unique (per client) incrementing reference id which is used during locking related requests.
* The server side uses this id to match if any previous request with the same id was issued and shall not re-do the lock related
* operation but it shall just return the previous result. Hence, this id identifies the outstanding request sent to the server
* side for locking operations. Similarly, if the client resends the request to the server for some reason it will use the same
* reference id to make sure that the operation is not executed more than once at the server side.
*
*/
public final class ClientLockReferenceIdGenerator {
private AtomicLong referenceIdCounter = new AtomicLong();

/**
*
* @return A per client unique reference id.
*/
public long getNextReferenceId() {
return referenceIdCounter.incrementAndGet();
}
}
Expand Up @@ -181,6 +181,8 @@ public class HazelcastClientInstanceImpl implements HazelcastInstance, Serializa
private final SerializationService serializationService;
private final ClientICacheManager hazelcastCacheManager;

private final ClientLockReferenceIdGenerator lockReferenceIdGenerator;

public HazelcastClientInstanceImpl(ClientConfig config,
ClientConnectionManagerFactory clientConnectionManagerFactory,
AddressProvider externalAddressProvider) {
Expand Down Expand Up @@ -223,6 +225,8 @@ public HazelcastClientInstanceImpl(ClientConfig config,

proxyManager.init(config);
hazelcastCacheManager = new ClientICacheManager(this);

lockReferenceIdGenerator = new ClientLockReferenceIdGenerator();
}

private Diagnostics initDiagnostics(ClientConfig config) {
Expand Down Expand Up @@ -443,6 +447,7 @@ public <K, V> IMap<K, V> getMap(String name) {
@Override
public <K, V> MultiMap<K, V> getMultiMap(String name) {
return getDistributedObject(MultiMapService.SERVICE_NAME, name);

}

@Override
Expand Down Expand Up @@ -690,4 +695,7 @@ public void doShutdown() {
diagnostics.shutdown();
}

public ClientLockReferenceIdGenerator getLockReferenceIdGenerator() {
return lockReferenceIdGenerator;
}
}
Expand Up @@ -16,6 +16,7 @@

package com.hazelcast.client.proxy;

import com.hazelcast.client.impl.ClientLockReferenceIdGenerator;
import com.hazelcast.client.impl.protocol.ClientMessage;
import com.hazelcast.client.impl.protocol.codec.ConditionAwaitCodec;
import com.hazelcast.client.impl.protocol.codec.ConditionBeforeAwaitCodec;
Expand All @@ -37,6 +38,7 @@
public class ClientConditionProxy extends PartitionSpecificClientProxy implements ICondition {

private final String conditionId;
private ClientLockReferenceIdGenerator referenceIdGenerator;

public ClientConditionProxy(ClientLockProxy clientLockProxy, String name, ClientContext ctx) {
super(LockService.SERVICE_NAME, clientLockProxy.getName());
Expand Down Expand Up @@ -76,13 +78,15 @@ public boolean await(long time, TimeUnit unit) throws InterruptedException {
}

private void beforeAwait(long threadId) {
ClientMessage request = ConditionBeforeAwaitCodec.encodeRequest(conditionId, threadId, name);
ClientMessage request = ConditionBeforeAwaitCodec
.encodeRequest(conditionId, threadId, name, referenceIdGenerator.getNextReferenceId());
invokeOnPartition(request);
}

private boolean doAwait(long time, TimeUnit unit, long threadId) throws InterruptedException {
final long timeoutInMillis = unit.toMillis(time);
ClientMessage request = ConditionAwaitCodec.encodeRequest(conditionId, threadId, timeoutInMillis, name);
ClientMessage request = ConditionAwaitCodec
.encodeRequest(conditionId, threadId, timeoutInMillis, name, referenceIdGenerator.getNextReferenceId());
ClientMessage response = invokeOnPartition(request);
return ConditionAwaitCodec.decodeResponse(response).response;
}
Expand Down Expand Up @@ -110,5 +114,9 @@ public void signalAll() {
invokeOnPartition(request);
}


@Override
protected void onInitialize() {
super.onInitialize();
referenceIdGenerator = getClient().getLockReferenceIdGenerator();
}
}
Expand Up @@ -16,6 +16,7 @@

package com.hazelcast.client.proxy;

import com.hazelcast.client.impl.ClientLockReferenceIdGenerator;
import com.hazelcast.client.impl.protocol.ClientMessage;
import com.hazelcast.client.impl.protocol.codec.LockForceUnlockCodec;
import com.hazelcast.client.impl.protocol.codec.LockGetLockCountCodec;
Expand All @@ -40,6 +41,8 @@
*/
public class ClientLockProxy extends PartitionSpecificClientProxy implements ILock {

private ClientLockReferenceIdGenerator referenceIdGenerator;

public ClientLockProxy(String serviceName, String objectId) {
super(serviceName, objectId);
}
Expand Down Expand Up @@ -81,12 +84,12 @@ public long getRemainingLeaseTime() {
public void lock(long leaseTime, TimeUnit timeUnit) {
checkPositive(leaseTime, "leaseTime should be positive");
ClientMessage request = LockLockCodec.encodeRequest(name,
getTimeInMillis(leaseTime, timeUnit), ThreadUtil.getThreadId());
getTimeInMillis(leaseTime, timeUnit), ThreadUtil.getThreadId(), referenceIdGenerator.getNextReferenceId());
invokeOnPartition(request);
}

public void forceUnlock() {
ClientMessage request = LockForceUnlockCodec.encodeRequest(name);
ClientMessage request = LockForceUnlockCodec.encodeRequest(name, referenceIdGenerator.getNextReferenceId());
invokeOnPartition(request);
}

Expand All @@ -98,12 +101,14 @@ public ICondition newCondition(String name) {
}

public void lock() {
ClientMessage request = LockLockCodec.encodeRequest(name, -1, ThreadUtil.getThreadId());
ClientMessage request = LockLockCodec
.encodeRequest(name, -1, ThreadUtil.getThreadId(), referenceIdGenerator.getNextReferenceId());
invokeOnPartition(request);
}

public void lockInterruptibly() throws InterruptedException {
ClientMessage request = LockLockCodec.encodeRequest(name, -1, ThreadUtil.getThreadId());
ClientMessage request = LockLockCodec
.encodeRequest(name, -1, ThreadUtil.getThreadId(), referenceIdGenerator.getNextReferenceId());
invokeOnPartitionInterruptibly(request);
}

Expand All @@ -124,13 +129,15 @@ public boolean tryLock(long timeout, TimeUnit unit, long leaseTime, TimeUnit lea
long timeoutInMillis = getTimeInMillis(timeout, unit);
long leaseTimeInMillis = getTimeInMillis(leaseTime, leaseUnit);
long threadId = ThreadUtil.getThreadId();
ClientMessage request = LockTryLockCodec.encodeRequest(name, threadId, leaseTimeInMillis, timeoutInMillis);
ClientMessage request = LockTryLockCodec
.encodeRequest(name, threadId, leaseTimeInMillis, timeoutInMillis, referenceIdGenerator.getNextReferenceId());
LockTryLockCodec.ResponseParameters resultParameters = LockTryLockCodec.decodeResponse(invokeOnPartition(request));
return resultParameters.response;
}

public void unlock() {
ClientMessage request = LockUnlockCodec.encodeRequest(name, ThreadUtil.getThreadId());
ClientMessage request = LockUnlockCodec
.encodeRequest(name, ThreadUtil.getThreadId(), referenceIdGenerator.getNextReferenceId());
invokeOnPartition(request);
}

Expand All @@ -146,4 +153,11 @@ private long getTimeInMillis(final long time, final TimeUnit timeunit) {
public String toString() {
return "ILock{" + "name='" + name + '\'' + '}';
}

@Override
protected void onInitialize() {
super.onInitialize();

referenceIdGenerator = getClient().getLockReferenceIdGenerator();
}
}

0 comments on commit 2f6ab03

Please sign in to comment.