Skip to content

Commit

Permalink
Backport IGNITE-5613 into 1.8.x
Browse files Browse the repository at this point in the history
IGNITE-5613 - Fixed deadlock on sequence update inside transaction

(cherry picked from commit 7db925c)
  • Loading branch information
agoncharuk authored and symbicator committed Oct 27, 2017
1 parent df68332 commit 1488391
Showing 1 changed file with 64 additions and 185 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,16 @@
import java.io.ObjectOutput;
import java.io.ObjectStreamException;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.CU;
Expand All @@ -45,8 +42,7 @@
import org.apache.ignite.lang.IgniteBiTuple;
import org.jetbrains.annotations.Nullable;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.ignite.internal.util.typedef.internal.CU.retryTopologySafe;
import static org.apache.ignite.internal.processors.cache.GridCacheUtils.retryTopologySafe;
import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;

Expand Down Expand Up @@ -88,29 +84,29 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc

/** Local value of sequence. */
@GridToStringInclude(sensitive = true)
private long locVal;
private volatile long locVal;

/** Upper bound of local counter. */
private long upBound;

/** Sequence batch size */
private volatile int batchSize;

/** Synchronization lock. */
private final Lock lock = new ReentrantLock();
/** Synchronization lock for local value updates. */
private final Lock localUpdate = new ReentrantLock();

/** Await condition. */
private Condition cond = lock.newCondition();
/** Synchronization for distributed sequence update. Acquired by threads with free topology (not in TX). */
private final ReentrantLock distUpdateFreeTop = new ReentrantLock();

/** Synchronization for distributed sequence update. Acquired by threads with locked topology (inside TX). */
private final ReentrantLock distUpdateLockedTop = new ReentrantLock();

/** Callable for execution {@link #incrementAndGet} operation in async and sync mode. */
private final Callable<Long> incAndGetCall = internalUpdate(1, true);

/** Callable for execution {@link #getAndIncrement} operation in async and sync mode. */
private final Callable<Long> getAndIncCall = internalUpdate(1, false);

/** Add and get cache call guard. */
private final AtomicBoolean updateGuard = new AtomicBoolean();

/**
* Empty constructor required by {@link Externalizable}.
*/
Expand Down Expand Up @@ -162,14 +158,7 @@ public GridCacheAtomicSequenceImpl(String name,
@Override public long get() {
checkRemoved();

lock.lock();

try {
return locVal;
}
finally {
lock.unlock();
}
return locVal;
}

/** {@inheritDoc} */
Expand Down Expand Up @@ -232,154 +221,51 @@ private long internalUpdate(long l, @Nullable Callable<Long> updateCall, boolean

assert l > 0;

lock.lock();
localUpdate.lock();

try {
// If reserved range isn't exhausted.
if (locVal + l <= upBound) {
long curVal = locVal;
long locVal0 = locVal;

locVal += l;
if (locVal0 + l <= upBound) {
locVal = locVal0 + l;

return updated ? locVal : curVal;
return updated ? locVal0 + l : locVal0;
}
}
finally {
lock.unlock();
localUpdate.unlock();
}

if (updateCall == null)
updateCall = internalUpdate(l, updated);

while (true) {
if (updateGuard.compareAndSet(false, true)) {
try {
try {
return updateCall.call();
}
catch (IgniteCheckedException | IgniteException | IllegalStateException e) {
throw e;
}
catch (Exception e) {
throw new IgniteCheckedException(e);
}
}
finally {
lock.lock();

try {
updateGuard.set(false);

cond.signalAll();
}
finally {
lock.unlock();
}
}
}
else {
lock.lock();

try {
while (locVal >= upBound && updateGuard.get())
U.await(cond, 500, MILLISECONDS);
AffinityTopologyVersion lockedVer = ctx.shared().lockedTopologyVersion(null);

checkRemoved();

// If reserved range isn't exhausted.
if (locVal + l <= upBound) {
long curVal = locVal;

locVal += l;
// We need two separate locks here because two independent thread may attempt to update the sequence
// simultaneously, one thread with locked topology and other with unlocked.
// We cannot use the same lock for both cases because it leads to a deadlock when free-topology thread
// waits for topology change, and locked topology thread waits to acquire the lock.
// If a thread has locked topology, it must bypass sync with non-locked threads, but at the same time
// we do not want multiple threads to attempt to run identical cache updates.
ReentrantLock distLock = lockedVer == null ? distUpdateFreeTop : distUpdateLockedTop;

return updated ? locVal : curVal;
}
}
finally {
lock.unlock();
}
}
}
}

/**
* Asynchronous sequence update operation. Will add given amount to the sequence value.
*
* @param l Increment amount.
* @param updateCall Cache call that will update sequence reservation count in accordance with l.
* @param updated If {@code true}, will return sequence value after update, otherwise will return sequence value
* prior to update.
* @return Future indicating sequence value.
* @throws IgniteCheckedException If update failed.
*/
@SuppressWarnings("SignalWithoutCorrespondingAwait")
private IgniteInternalFuture<Long> internalUpdateAsync(long l, @Nullable Callable<Long> updateCall, boolean updated)
throws IgniteCheckedException {
checkRemoved();

A.ensure(l > 0, " Parameter mustn't be less then 1: " + l);

lock.lock();
distLock.lock();

try {
// If reserved range isn't exhausted.
if (locVal + l <= upBound) {
long curVal = locVal;
if (updateCall == null)
updateCall = internalUpdate(l, updated);

locVal += l;

return new GridFinishedFuture<>(updated ? locVal : curVal);
try {
return updateCall.call();
}
}
finally {
lock.unlock();
}

if (updateCall == null)
updateCall = internalUpdate(l, updated);

while (true) {
if (updateGuard.compareAndSet(false, true)) {
try {
// This call must be outside lock.
return ctx.closures().callLocalSafe(updateCall, true);
}
finally {
lock.lock();

try {
updateGuard.set(false);

cond.signalAll();
}
finally {
lock.unlock();
}
}
catch (IgniteCheckedException | IgniteException | IllegalStateException e) {
throw e;
}
else {
lock.lock();

try {
while (locVal >= upBound && updateGuard.get())
U.await(cond, 500, MILLISECONDS);

checkRemoved();

// If reserved range isn't exhausted.
if (locVal + l <= upBound) {
long curVal = locVal;

locVal += l;

return new GridFinishedFuture<>(updated ? locVal : curVal);
}
}
finally {
lock.unlock();
}
catch (Exception e) {
throw new IgniteCheckedException(e);
}
}
finally {
distLock.unlock();
}
}

/** Get local batch size for this sequences.
Expand All @@ -398,13 +284,13 @@ private IgniteInternalFuture<Long> internalUpdateAsync(long l, @Nullable Callabl
@Override public void batchSize(int size) {
A.ensure(size > 0, " Batch size can't be less then 0: " + size);

lock.lock();
localUpdate.lock();

try {
batchSize = size;
}
finally {
lock.unlock();
localUpdate.unlock();
}
}

Expand Down Expand Up @@ -486,6 +372,8 @@ private IllegalStateException removedError() {
private Callable<Long> internalUpdate(final long l, final boolean updated) {
return retryTopologySafe(new Callable<Long>() {
@Override public Long call() throws Exception {
assert distUpdateFreeTop.isHeldByCurrentThread() || distUpdateLockedTop.isHeldByCurrentThread();

try (IgniteInternalTx tx = CU.txStartInternal(ctx, seqView, PESSIMISTIC, REPEATABLE_READ)) {
GridCacheAtomicSequenceValue seq = seqView.get(key);

Expand All @@ -497,48 +385,39 @@ private Callable<Long> internalUpdate(final long l, final boolean updated) {

long newUpBound;

lock.lock();

try {
curLocVal = locVal;
curLocVal = locVal;

// If local range was already reserved in another thread.
if (locVal + l <= upBound) {
long retVal = locVal;
// If local range was already reserved in another thread.
if (curLocVal + l <= upBound) {
locVal = curLocVal + l;

locVal += l;

return updated ? locVal : retVal;
}
return updated ? curLocVal + l : curLocVal;
}

long curGlobalVal = seq.get();
long curGlobalVal = seq.get();

long newLocVal;
long newLocVal;

/* We should use offset because we already reserved left side of range.*/
long off = batchSize > 1 ? batchSize - 1 : 1;
/* We should use offset because we already reserved left side of range.*/
long off = batchSize > 1 ? batchSize - 1 : 1;

// Calculate new values for local counter, global counter and upper bound.
if (curLocVal + l >= curGlobalVal) {
newLocVal = curLocVal + l;
// Calculate new values for local counter, global counter and upper bound.
if (curLocVal + l >= curGlobalVal) {
newLocVal = curLocVal + l;

newUpBound = newLocVal + off;
}
else {
newLocVal = curGlobalVal;
newUpBound = newLocVal + off;
}
else {
newLocVal = curGlobalVal;

newUpBound = newLocVal + off;
}
newUpBound = newLocVal + off;
}

locVal = newLocVal;
upBound = newUpBound;
locVal = newLocVal;
upBound = newUpBound;

if (updated)
curLocVal = newLocVal;
}
finally {
lock.unlock();
}
if (updated)
curLocVal = newLocVal;

// Global counter must be more than reserved upper bound.
seq.set(newUpBound + 1);
Expand Down

0 comments on commit 1488391

Please sign in to comment.