Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,38 @@ public class ThreadUtils {
/** System line separator. */
private static final String NL = System.lineSeparator();

/**
* Performs thread dump and prints all available info to the given log with {@code WARN} or {@code ERROR} logging level depending on
* {@code isErrorLevel} parameter. If there's no thread with a given ID, or ID is invalid, then nothing is printed.
*
* @param log Logger.
* @param threadId ID of a thread to dump.
* @param isErrorLevel {@code true} if thread dump must be printed with {@code ERROR} logging level, {@code false} if thread dump must
* be printed with {@code WARN} logging level.
*/
public static void dumpThread(IgniteLogger log, long threadId, boolean isErrorLevel) {
if (threadId <= 0) {
return;
}

// We don't really need a full stack, 64 as a default should be enough for debugging.
int maxStackElements = 64;

ThreadInfo info = ManagementFactory.getThreadMXBean().getThreadInfo(threadId, maxStackElements);
if (info == null) {
return;
}
Comment thread
ibessonov marked this conversation as resolved.

StringBuilder sb = new StringBuilder(THREAD_DUMP_MSG)
.append(THREAD_DUMP_FMT.format(Instant.ofEpochMilli(System.currentTimeMillis())))
.append(NL);

printThreadInfo(info, sb, Set.of());
sb.append(NL);

logMessage(log, sb.toString(), isErrorLevel);
}

/**
* Performs thread dump and prints all available info to the given log
* with WARN or ERROR logging level depending on {@code isErrorLevel} parameter.
Expand Down Expand Up @@ -91,8 +123,9 @@ public static void dumpThreads(IgniteLogger log, @Nullable String message, boole

sb.append(NL);

if (info.getLockedSynchronizers() != null && info.getLockedSynchronizers().length > 0) {
printSynchronizersInfo(info.getLockedSynchronizers(), sb);
LockInfo[] lockedSynchronizers = info.getLockedSynchronizers();
if (lockedSynchronizers != null && lockedSynchronizers.length > 0) {
printSynchronizersInfo(lockedSynchronizers, sb);

sb.append(NL);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,25 @@
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.ignite.internal.lang.IgniteSystemProperties;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.thread.ThreadUtils;
import org.apache.ignite.internal.tostring.S;
import org.jetbrains.annotations.Nullable;

/**
* Lock state structure is as follows.
* <pre>
* +----------------+---------------+---------+----------+
* | WRITE WAIT CNT | READ WAIT CNT | TAG | LOCK CNT |
* +----------------+---------------+---------+----------+
* | 2 bytes | 2 bytes | 2 bytes | 2 bytes |
* +----------------+---------------+---------+----------+
* +----------------+---------------+---------+----------+----------+
Comment thread
ibessonov marked this conversation as resolved.
* | WRITE WAIT CNT | READ WAIT CNT | TAG | LOCK CNT | OWNER ID |
* +----------------+---------------+---------+----------+----------+
* | 2 bytes | 2 bytes | 2 bytes | 2 bytes | 8 bytes |
* +----------------+---------------+---------+----------+----------+
* </pre>
*/
public class OffheapReadWriteLock {
private final IgniteLogger log = Loggers.forClass(OffheapReadWriteLock.class);

/** Default concurrency level for the lock. */
public static final int DEFAULT_CONCURRENCY_LEVEL = 128;

Expand All @@ -59,10 +64,16 @@ public class OffheapReadWriteLock {
public static final int TAG_LOCK_ALWAYS = -1;

/** Lock size. */
public static final int LOCK_SIZE = 8;
public static final int LOCK_SIZE = Long.BYTES * 2;

/** Offset to the thread ID of a thread that currently holds write lock. */
private static final int OWNER_ID_OFFSET = Long.BYTES;

/** Placeholder value for when no one holds a write lock. See {@link #OWNER_ID_OFFSET}. */
private static final long NO_OWNER_ID = -1L;

/** Maximum number of waiting threads, read or write. */
public static final int MAX_WAITERS = 0xFFFF;
private static final int MAX_WAITERS = 0xFFFF;

/** Striped locks array. */
private final ReentrantLock[] locks;
Expand Down Expand Up @@ -142,6 +153,7 @@ public void init(long lock, int tag) {
assert tag != 0;

GridUnsafe.putLong(lock, (long) tag << 16);
GridUnsafe.putLong(lock + OWNER_ID_OFFSET, NO_OWNER_ID);
}

/**
Expand Down Expand Up @@ -241,8 +253,14 @@ public void readUnlock(long lock) {
public boolean tryWriteLock(long lock, int tag) {
long state = GridUnsafe.getLongVolatile(null, lock);

return checkTag(state, tag) && canWriteLock(state)
boolean success = checkTag(state, tag) && canWriteLock(state)
&& GridUnsafe.compareAndSwapLong(null, lock, state, updateState(state, -1, 0, 0));

if (success) {
setOwnerId(lock);
}

return success;
}

/**
Expand All @@ -265,6 +283,8 @@ public boolean writeLock(long lock, int tag) {

if (canWriteLock(state)) {
if (GridUnsafe.compareAndSwapLong(null, lock, state, updateState(state, -1, 0, 0))) {
setOwnerId(lock);

return true;
} else {
// Retry CAS, do not count as spin cycle.
Expand Down Expand Up @@ -321,11 +341,14 @@ public void writeUnlock(long lock, int tag) {
while (true) {
long state = GridUnsafe.getLongVolatile(null, lock);

if (lockCount(state) != -1) {
long ownerId = getOwnerId(lock);
if (lockCount(state) != -1 || ownerId != NO_OWNER_ID && ownerId != Thread.currentThread().getId()) {
throw new IllegalMonitorStateException("Attempted to release write lock while not holding it "
+ "[lock=" + hexLong(lock) + ", state=" + hexLong(state) + ']');
}

clearOwnerId(lock);
Comment thread
ibessonov marked this conversation as resolved.

updated = releaseWithTag(state, tag);

assert updated != 0;
Expand Down Expand Up @@ -399,6 +422,8 @@ private void signalNextWaiter(int writeWaitCnt, int idx) {

if (lockCount(state) == 1) {
if (GridUnsafe.compareAndSwapLong(null, lock, state, updateState(state, -2, 0, 0))) {
setOwnerId(lock);

return true;
} else {
// Retry CAS, do not count as spin cycle.
Expand All @@ -424,6 +449,8 @@ private void signalNextWaiter(int writeWaitCnt, int idx) {

if (lockCount(state) == 1) {
if (GridUnsafe.compareAndSwapLong(null, lock, state, updateState(state, -2, 0, 0))) {
setOwnerId(lock);

return true;
} else {
continue;
Expand Down Expand Up @@ -520,6 +547,8 @@ private boolean waitAcquireWriteLock(long lock, int lockIdx, int tag) {
long updated = updateState(state, -1, 0, -1);

if (GridUnsafe.compareAndSwapLong(null, lock, state, updated)) {
setOwnerId(lock);

return true;
}
} else {
Expand Down Expand Up @@ -560,6 +589,10 @@ private void awaitCondition(
long passedNanos = System.nanoTime() - startTimeNanos;

if (passedNanos >= timeoutNanos) {
long ownerId = getOwnerId(lock);

ThreadUtils.dumpThread(log, ownerId, true);
Comment thread
ibessonov marked this conversation as resolved.

//noinspection InfiniteLoopStatement
while (true) {
long state = GridUnsafe.getLongVolatile(null, lock);
Expand All @@ -571,7 +604,8 @@ private void awaitCondition(
"tag", hexInt(tag), false,
"idx", lockIdx, false,
"cond", waitCond.toString(), false,
"timeout", TimeUnit.NANOSECONDS.toMillis(timeoutNanos) + "ms", false
"timeout", TimeUnit.NANOSECONDS.toMillis(timeoutNanos) + "ms", false,
"ownerId", ownerId, false
));
Comment thread
ibessonov marked this conversation as resolved.
}
}
Expand Down Expand Up @@ -772,4 +806,16 @@ private void updateWritersWaitCount(long lock, ReentrantLock lockObj, int delta)
}
}
}

private static void setOwnerId(long lock) {
GridUnsafe.putLongVolatile(null, lock + OWNER_ID_OFFSET, Thread.currentThread().getId());
}

private static void clearOwnerId(long lock) {
GridUnsafe.putLongVolatile(null, lock + OWNER_ID_OFFSET, NO_OWNER_ID);
}

private static long getOwnerId(long lock) {
return GridUnsafe.getLongVolatile(null, lock + OWNER_ID_OFFSET);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,11 @@
* <p/>
* When page is allocated and is in use:
* <pre>
* +--------+--------+--------+--------+---------------------------+
* |8 bytes |8 bytes |8 bytes |8 bytes | PAGE_SIZE |
* +--------+--------+--------+--------+---------------------------+
* | Marker |Page ID |Pin CNT | Lock | Page data |
* +--------+--------+--------+--------+---------------------------+
* +--------+--------+---------+---------------------------+
* |8 bytes |8 bytes |16 bytes | PAGE_SIZE |
* +--------+--------+---------+---------------------------+
* | Marker |Page ID | Lock | Page data |
* +--------+--------+---------+---------------------------+
* </pre>
*
* <p>Note that first 8 bytes of page header are used either for page marker or for next relative pointer depending
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,27 @@
package org.apache.ignite.internal.pagememory.persistence;

import static org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory.INVALID_REL_PTR;
import static org.apache.ignite.internal.util.GridUnsafe.compareAndSwapInt;
import static org.apache.ignite.internal.util.GridUnsafe.decrementAndGetInt;
import static org.apache.ignite.internal.util.GridUnsafe.getInt;
import static org.apache.ignite.internal.util.GridUnsafe.getIntVolatile;
import static org.apache.ignite.internal.util.GridUnsafe.getLong;
import static org.apache.ignite.internal.util.GridUnsafe.putInt;
import static org.apache.ignite.internal.util.GridUnsafe.putIntVolatile;
import static org.apache.ignite.internal.util.GridUnsafe.putLong;
import static org.apache.ignite.internal.util.GridUnsafe.putLongVolatile;
import static org.apache.ignite.internal.util.IgniteUtils.isPow2;

import org.apache.ignite.internal.pagememory.FullPageId;
import org.apache.ignite.internal.util.GridUnsafe;
import org.apache.ignite.internal.util.OffheapReadWriteLock;

/**
* Helper class for working with the page header that is stored in memory for {@link PersistentPageMemory}.
*
* <p>Page header has the following structure:</p>
* <pre>
* +-----------------+---------------------+--------+--------+---------+----------+----------+----------------------+
* | 8 bytes | 4 bytes |4 bytes |8 bytes |4 bytes |4 bytes |8 bytes | 8 bytes |
* | 8 bytes | 4 bytes |4 bytes |8 bytes |4 bytes |4 bytes |16 bytes | 8 bytes |
* +-----------------+---------------------+--------+--------+---------+----------+----------+----------------------+
* |Marker/Timestamp |Partition generation |Flags |Page ID |Group ID |Pin count |Lock data |Checkpoint tmp buffer |
* +-----------------+---------------------+--------+--------+---------+----------+----------+----------------------+
Expand All @@ -62,32 +64,32 @@ public class PageHeader {
/** Unknown partition generation. */
static final int UNKNOWN_PARTITION_GENERATION = -1;

/** 8b Marker/timestamp, 4b Partition generation, 4b flags, 8b Page ID, 4b Group ID, 4b Pin count, 8b Lock, 8b Temporary buffer. */
public static final int PAGE_OVERHEAD = 48;

/** Marker or timestamp offset. */
private static final int MARKER_OR_TIMESTAMP_OFFSET = 0;

/** Partition generation offset. */
private static final int PARTITION_GENERATION_OFFSET = 8;
private static final int PARTITION_GENERATION_OFFSET = MARKER_OR_TIMESTAMP_OFFSET + Long.BYTES;

/** Flags offset. */
private static final int FLAGS_OFFSET = 12;
private static final int FLAGS_OFFSET = PARTITION_GENERATION_OFFSET + Integer.BYTES;

/** Page ID offset. */
private static final int PAGE_ID_OFFSET = 16;
private static final int PAGE_ID_OFFSET = FLAGS_OFFSET + Integer.BYTES;

/** Page group ID offset. */
private static final int PAGE_GROUP_ID_OFFSET = 24;
private static final int PAGE_GROUP_ID_OFFSET = PAGE_ID_OFFSET + Long.BYTES;

/** Page pin counter offset. */
private static final int PAGE_PIN_CNT_OFFSET = 28;
private static final int PAGE_PIN_CNT_OFFSET = PAGE_GROUP_ID_OFFSET + Integer.BYTES;

/** Page lock offset. */
public static final int PAGE_LOCK_OFFSET = 32;
public static final int PAGE_LOCK_OFFSET = PAGE_PIN_CNT_OFFSET + Integer.BYTES;

/** Page temp copy buffer relative pointer offset. */
private static final int PAGE_TMP_BUF_OFFSET = 40;
private static final int PAGE_TMP_BUF_OFFSET = PAGE_LOCK_OFFSET + OffheapReadWriteLock.LOCK_SIZE;

/** 8b Marker/timestamp, 4b Partition generation, 4b flags, 8b Page ID, 4b Group ID, 4b Pin count, 16b Lock, 8b Temporary buffer. */
public static final int PAGE_OVERHEAD = PAGE_TMP_BUF_OFFSET + Long.BYTES;
Comment thread
ibessonov marked this conversation as resolved.

/**
* Initializes the header of the page.
Expand All @@ -109,7 +111,7 @@ public static void initNew(long absPtr) {
* @param absPtr Absolute pointer.
*/
public static boolean dirty(long absPtr) {
return flag(absPtr, DIRTY_FLAG, false);
return flag(absPtr, DIRTY_FLAG);
}

/**
Expand All @@ -120,35 +122,34 @@ public static boolean dirty(long absPtr) {
* @return Previous value of dirty flag.
*/
public static boolean dirty(long absPtr, boolean dirty) {
return flag(absPtr, DIRTY_FLAG, dirty, false);
return flag(absPtr, DIRTY_FLAG, dirty);
}

/**
* Reads the value of a header validity flag. Does it using a volatile memory access.
*/
public static boolean headerIsValid(long absPtr) {
return flag(absPtr, HEADER_IS_VALID_FLAG, true);
return flag(absPtr, HEADER_IS_VALID_FLAG);
}

/**
* Updates the value of a header validity flag. Does it using a volatile memory access.
*/
public static void headerIsValid(long absPtr, boolean valid) {
flag(absPtr, HEADER_IS_VALID_FLAG, valid, true);
flag(absPtr, HEADER_IS_VALID_FLAG, valid);
}

/**
* Returns flag value.
*
* @param absPtr Absolute pointer.
* @param flagMask Flag mask.
* @param volatileAccess Whether to use volatile memory access.
*/
private static boolean flag(long absPtr, int flagMask, boolean volatileAccess) {
private static boolean flag(long absPtr, int flagMask) {
assert (flagMask & 0xFFFFFF) == 0 : Integer.toHexString(flagMask);
assert Long.bitCount(flagMask) == 1 : Integer.toHexString(flagMask);
assert isPow2(flagMask) : Integer.toHexString(flagMask);

int flags = volatileAccess ? getIntVolatile(null, absPtr + FLAGS_OFFSET) : getInt(absPtr + FLAGS_OFFSET);
int flags = getIntVolatile(null, absPtr + FLAGS_OFFSET);

return (flags & flagMask) != 0;
}
Expand All @@ -159,30 +160,27 @@ private static boolean flag(long absPtr, int flagMask, boolean volatileAccess) {
* @param absPtr Absolute pointer.
* @param flagMask Flag mask.
* @param set New flag value.
* @param volatileAccess Whether to use volatile memory access.
* @return Previous flag value.
*/
private static boolean flag(long absPtr, int flagMask, boolean set, boolean volatileAccess) {
private static boolean flag(long absPtr, int flagMask, boolean set) {
assert (flagMask & 0xFFFFFF) == 0 : Integer.toHexString(flagMask);
assert Long.bitCount(flagMask) == 1 : Integer.toHexString(flagMask);
assert isPow2(flagMask) : Integer.toHexString(flagMask);

int flags = volatileAccess ? getIntVolatile(null, absPtr + FLAGS_OFFSET) : getInt(absPtr + FLAGS_OFFSET);
while (true) {
int flags = getIntVolatile(null, absPtr + FLAGS_OFFSET);

boolean was = (flags & flagMask) != 0;
boolean was = (flags & flagMask) != 0;

if (set) {
flags |= flagMask;
} else {
flags &= ~flagMask;
}
if (was == set) {
return was;
}

if (volatileAccess) {
putIntVolatile(null, absPtr + FLAGS_OFFSET, flags);
} else {
putInt(absPtr + FLAGS_OFFSET, flags);
}
int newFlags = set ? (flags | flagMask) : (flags & ~flagMask);

return was;
if (compareAndSwapInt(null, absPtr + FLAGS_OFFSET, flags, newFlags)) {
return was;
}
}
}

/**
Expand Down