Skip to content
Permalink
Browse files
IGNITE-16935 [Native Persistence 3.0] Porting a checkpoint and relate…
…d code, part 3 (#806)
  • Loading branch information
tkalkirill committed May 18, 2022
1 parent 5bb2548 commit 0b2541c3bd00541a5137a65282a1f2a704beda38
Showing 23 changed files with 2,620 additions and 70 deletions.
@@ -25,6 +25,7 @@
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.internal.tostring.S;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteLogger;
import org.jetbrains.annotations.Nullable;
@@ -51,7 +52,7 @@ public class LongJvmPauseDetector implements IgniteComponent {
private static final int PRECISION = getInteger("IGNITE_JVM_PAUSE_DETECTOR_PRECISION", DFLT_JVM_PAUSE_DETECTOR_PRECISION);

/** Threshold. */
private static final int THRESHOLD = getInteger("IGNITE_JVM_PAUSE_DETECTOR_THRESHOLD", DEFAULT_JVM_PAUSE_DETECTOR_THRESHOLD);
private final int threshold = getInteger("IGNITE_JVM_PAUSE_DETECTOR_THRESHOLD", DEFAULT_JVM_PAUSE_DETECTOR_THRESHOLD);

/** Event count. */
private static final int EVT_CNT = getInteger("IGNITE_JVM_PAUSE_DETECTOR_LAST_EVENTS_COUNT",
@@ -117,7 +118,7 @@ public void run() {
final long now = System.currentTimeMillis();
final long pause = now - PRECISION - lastWakeUpTime;

if (pause >= THRESHOLD) {
if (pause >= threshold) {
LOG.warn("Possible too long JVM pause: " + pause + " milliseconds.");

synchronized (LongJvmPauseDetector.this) {
@@ -241,9 +242,16 @@ synchronized Map<Long, Long> longPauseEvents() {
return new IgniteBiTuple<>(longPausesTimestamps[lastPauseIdx], longPausesDurations[lastPauseIdx]);
}

/**
* Return long JVM pause threshold in mills.
*/
public long longJvmPauseThreshold() {
return threshold;
}

/** {@inheritDoc} */
@Override
public String toString() {
return "S.toString(LongJVMPauseDetector.class, this)";
return S.toString(LongJvmPauseDetector.class, this);
}
}
@@ -421,21 +421,34 @@ public static CompletableFuture<Long> runMultiThreadedAsync(Callable<?> call, in
/**
* Waits for the condition.
*
* @param cond Condition.
* @param cond Condition.
* @param timeoutMillis Timeout in milliseconds.
* @return {@code True} if the condition was satisfied within the timeout.
* @throws InterruptedException If waiting was interrupted.
*/
@SuppressWarnings("BusyWait")
public static boolean waitForCondition(BooleanSupplier cond, long timeoutMillis) throws InterruptedException {
return waitForCondition(cond, 50, timeoutMillis);
}

/**
* Waits for the condition.
*
* @param cond Condition.
* @param sleepMillis Sleep im milliseconds.
* @param timeoutMillis Timeout in milliseconds.
* @return {@code True} if the condition was satisfied within the timeout.
* @throws InterruptedException If waiting was interrupted.
*/
@SuppressWarnings("BusyWait")
public static boolean waitForCondition(BooleanSupplier cond, long sleepMillis, long timeoutMillis) throws InterruptedException {
long stop = System.currentTimeMillis() + timeoutMillis;

while (System.currentTimeMillis() < stop) {
if (cond.getAsBoolean()) {
return true;
}

sleep(50);
sleep(sleepMillis);
}

return false;
@@ -83,6 +83,7 @@
import org.apache.ignite.internal.pagememory.mem.IgniteOutOfMemoryException;
import org.apache.ignite.internal.pagememory.metric.IoStatisticsHolder;
import org.apache.ignite.internal.pagememory.metric.IoStatisticsHolderNoOp;
import org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointMetricsTracker;
import org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointPages;
import org.apache.ignite.internal.pagememory.persistence.replacement.ClockPageReplacementPolicyFactory;
import org.apache.ignite.internal.pagememory.persistence.replacement.DelayedPageReplacementTracker;
@@ -1186,7 +1187,7 @@ private void setDirty(FullPageId pageId, long absPtr, boolean dirty, boolean for
boolean wasDirty = dirty(absPtr, dirty);

if (dirty) {
// TODO: IGNITE-16935 Don't forget add assertion for checkpoint lock held by this thread
// TODO: IGNITE-16984 Don't forget add assertion for checkpoint lock held by this thread

if (!wasDirty || forceAdd) {
Segment seg = segment(pageId.groupId(), pageId.pageId());
@@ -1800,14 +1801,16 @@ boolean clearCheckpoint(FullPageId fullPageId) {
* @param pageSingleAcquire Page is acquired only once. We don't pin the page second time (until page will not be copied) in case
* checkpoint temporary buffer is used.
* @param pageStoreWriter Checkpoint page writer.
* @param tracker Checkpoint metrics tracker.
*/
private void copyPageForCheckpoint(
long absPtr,
FullPageId fullId,
ByteBuffer buf,
int tag,
boolean pageSingleAcquire,
PageStoreWriter pageStoreWriter
PageStoreWriter pageStoreWriter,
CheckpointMetricsTracker tracker
) throws IgniteInternalCheckedException {
assert absPtr != 0;
assert isAcquired(absPtr) || !isInCheckpoint(fullId);
@@ -1858,6 +1861,8 @@ private void copyPageForCheckpoint(

zeroMemory(tmpAbsPtr + PAGE_OVERHEAD, pageSize());

tracker.onCopyOnWritePageWritten();

releaseCheckpointBufferPage(tmpRelPtr);

// Need release again because we pin page when resolve abs pointer,
@@ -1899,12 +1904,14 @@ private void copyPageForCheckpoint(
* #beginCheckpoint(CompletableFuture)} method call.
* @param buf Temporary buffer to write changes into.
* @param pageStoreWriter Checkpoint page write context.
* @param tracker Checkpoint metrics tracker.
* @throws IgniteInternalCheckedException If failed to obtain page data.
*/
public void checkpointWritePage(
FullPageId fullId,
ByteBuffer buf,
PageStoreWriter pageStoreWriter
PageStoreWriter pageStoreWriter,
CheckpointMetricsTracker tracker
) throws IgniteInternalCheckedException {
assert buf.remaining() == pageSize();

@@ -1975,7 +1982,7 @@ public void checkpointWritePage(
}
}

copyPageForCheckpoint(absPtr, fullId, buf, tag, pageSingleAcquire, pageStoreWriter);
copyPageForCheckpoint(absPtr, fullId, buf, tag, pageSingleAcquire, pageStoreWriter, tracker);
}

/**
@@ -20,6 +20,7 @@
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.pagememory.FullPageId;
import org.apache.ignite.internal.pagememory.persistence.store.PageStore;
import org.apache.ignite.lang.IgniteInternalCheckedException;

/**
@@ -23,7 +23,6 @@
import static java.nio.file.Files.isDirectory;
import static java.nio.file.Files.list;
import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.partitioningBy;
import static java.util.stream.Collectors.toCollection;
import static java.util.stream.Collectors.toList;

@@ -159,16 +158,17 @@ private static void checkCheckpointDir(Path checkpointDir) throws IgniteInternal
assert isDirectory(checkpointDir) : checkpointDir;

try {
Map<Boolean, List<Path>> files = list(checkpointDir)
.collect(partitioningBy(path -> parseCheckpointIdFromMarkerFile(path) != null));
List<Path> notCheckpointMarkers = list(checkpointDir)
.filter(path -> parseCheckpointIdFromMarkerFile(path) == null)
.collect(toList());

if (!files.get(false).isEmpty()) {
if (!notCheckpointMarkers.isEmpty()) {
throw new IgniteInternalCheckedException(
"Not checkpoint markers found, they need to be removed manually: " + files.get(false)
"Not checkpoint markers found, they need to be removed manually: " + notCheckpointMarkers
);
}

Map<UUID, List<Path>> checkpointMarkers = files.get(true).stream()
Map<UUID, List<Path>> checkpointMarkers = list(checkpointDir)
.collect(groupingBy(CheckpointMarkersStorage::parseCheckpointIdFromMarkerFile));

List<UUID> checkpointsWithoutEndMarker = checkpointMarkers.entrySet().stream()

0 comments on commit 0b2541c

Please sign in to comment.