Skip to content

Commit

Permalink
IGNITE-9761 Fixed deadlock in WAL manager - Fixes #4890.
Browse files Browse the repository at this point in the history
Signed-off-by: Alexey Goncharuk <alexey.goncharuk@gmail.com>
  • Loading branch information
akalash authored and agoncharuk committed Oct 2, 2018
1 parent 4572137 commit bd07c83
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 26 deletions.
Expand Up @@ -63,10 +63,12 @@ long lastArchivedAbsoluteIndex() {
/**
* @param lastAbsArchivedIdx New value of last archived segment index.
*/
synchronized void setLastArchivedAbsoluteIndex(long lastAbsArchivedIdx) {
this.lastAbsArchivedIdx = lastAbsArchivedIdx;
void setLastArchivedAbsoluteIndex(long lastAbsArchivedIdx) {
synchronized (this) {
this.lastAbsArchivedIdx = lastAbsArchivedIdx;

notifyAll();
notifyAll();
}

notifyObservers(lastAbsArchivedIdx);
}
Expand Down
Expand Up @@ -219,6 +219,15 @@ public boolean checkCanReadArchiveOrReserveWorkSegment(long absIdx) {
return lastArchivedAbsoluteIndex() >= absIdx || segmentLockStorage.lockWorkSegment(absIdx);
}

/**
* Visible for test.
*
* @param absIdx Segment absolute index. segment later, use {@link #releaseWorkSegment} for unlock</li> </ul>
*/
void lockWorkSegment(long absIdx) {
segmentLockStorage.lockWorkSegment(absIdx);
}

/**
* @param absIdx Segment absolute index.
*/
Expand Down
Expand Up @@ -17,8 +17,8 @@

package org.apache.ignite.internal.processors.cache.persistence.wal.aware;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;

/**
Expand All @@ -29,15 +29,15 @@ public class SegmentLockStorage extends SegmentObservable {
* Maps absolute segment index to locks counter. Lock on segment protects from archiving segment and may come from
* {@link FileWriteAheadLogManager.RecordsIterator} during WAL replay. Map itself is guarded by <code>this</code>.
*/
private Map<Long, Integer> locked = new HashMap<>();
private Map<Long, Integer> locked = new ConcurrentHashMap<>();

/**
* Check if WAL segment locked (protected from move to archive)
*
* @param absIdx Index for check reservation.
* @return {@code True} if index is locked.
*/
public synchronized boolean locked(long absIdx) {
public boolean locked(long absIdx) {
return locked.containsKey(absIdx);
}

Expand All @@ -47,12 +47,8 @@ public synchronized boolean locked(long absIdx) {
* segment later, use {@link #releaseWorkSegment} for unlock</li> </ul>
*/
@SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext")
synchronized boolean lockWorkSegment(long absIdx) {
Integer cur = locked.get(absIdx);

cur = cur == null ? 1 : cur + 1;

locked.put(absIdx, cur);
boolean lockWorkSegment(long absIdx) {
locked.compute(absIdx, (idx, count) -> count == null ? 1 : count + 1);

return false;
}
Expand All @@ -61,15 +57,12 @@ synchronized boolean lockWorkSegment(long absIdx) {
* @param absIdx Segment absolute index.
*/
@SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext")
synchronized void releaseWorkSegment(long absIdx) {
Integer cur = locked.get(absIdx);

assert cur != null && cur >= 1 : "cur=" + cur + ", absIdx=" + absIdx;
void releaseWorkSegment(long absIdx) {
locked.compute(absIdx, (idx, count) -> {
assert count != null && count >= 1 : "cur=" + count + ", absIdx=" + absIdx;

if (cur == 1)
locked.remove(absIdx);
else
locked.put(absIdx, cur - 1);
return count == 1 ? null : count - 1;
});

notifyObservers(absIdx);
}
Expand Down
Expand Up @@ -17,21 +17,21 @@

package org.apache.ignite.internal.processors.cache.persistence.wal.aware;

import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.Consumer;

/**
* Implementation of observer-observable pattern. For handling specific changes of segment.
*/
public abstract class SegmentObservable {
/** Observers for handle changes of archived index. */
private final List<Consumer<Long>> observers = new ArrayList<>();
private final Queue<Consumer<Long>> observers = new ConcurrentLinkedQueue<>();

/**
* @param observer Observer for notification about segment's changes.
*/
synchronized void addObserver(Consumer<Long> observer) {
void addObserver(Consumer<Long> observer) {
observers.add(observer);
}

Expand All @@ -40,7 +40,7 @@ synchronized void addObserver(Consumer<Long> observer) {
*
* @param segmentId Segment which was been changed.
*/
synchronized void notifyObservers(long segmentId) {
void notifyObservers(long segmentId) {
observers.forEach(observer -> observer.accept(segmentId));
}
}
Expand Up @@ -31,6 +31,46 @@
* Test for {@link SegmentAware}.
*/
public class SegmentAwareTest extends TestCase {

/**
* Checking to avoid deadlock SegmentArchivedStorage.markAsMovedToArchive -> SegmentLockStorage.locked <->
* SegmentLockStorage.releaseWorkSegment -> SegmentArchivedStorage.onSegmentUnlocked
*
* @throws IgniteCheckedException if failed.
*/
public void testAvoidDeadlockArchiverAndLockStorage() throws IgniteCheckedException {
SegmentAware aware = new SegmentAware(10, false);

int iterationCnt = 100_000;
int segmentToHandle = 1;

IgniteInternalFuture archiverThread = GridTestUtils.runAsync(() -> {
int i = iterationCnt;

while (i-- > 0) {
try {
aware.markAsMovedToArchive(segmentToHandle);
}
catch (IgniteInterruptedCheckedException e) {
throw new RuntimeException(e);
}
}
});

IgniteInternalFuture lockerThread = GridTestUtils.runAsync(() -> {
int i = iterationCnt;

while (i-- > 0) {
aware.lockWorkSegment(segmentToHandle);

aware.releaseWorkSegment(segmentToHandle);
}
});

archiverThread.get();
lockerThread.get();
}

/**
* Waiting finished when work segment is set.
*/
Expand Down Expand Up @@ -435,7 +475,7 @@ public void testFinishWaitSegmentToCompress_WhenInterruptWasCall() throws Ignite
public void testLastCompressedIdxProperOrdering() throws IgniteInterruptedCheckedException {
SegmentAware aware = new SegmentAware(10, true);

for (int i = 0; i < 5 ; i++) {
for (int i = 0; i < 5; i++) {
aware.setLastArchivedAbsoluteIndex(i);
aware.waitNextSegmentToCompress();
}
Expand Down

0 comments on commit bd07c83

Please sign in to comment.