Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,12 @@ public class PageMemoryCheckpointConfigurationSchema {
/** Number of checkpoint threads. */
@Range(min = 1)
@Value(hasDefault = true)
public int threads = 4;
public int checkpointThreads = 4;

/** Number of threads to compact delta files. */
@Range(min = 1)
@Value(hasDefault = true)
public int compactionThreads = 4;

/** Timeout for checkpoint read lock acquisition in milliseconds. */
@Range(min = 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.ignite.internal.pagememory.persistence.PartitionMetaManager;
import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
import org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointDirtyPages.CheckpointDirtyPagesView;
import org.apache.ignite.internal.pagememory.persistence.compaction.Compactor;
import org.apache.ignite.internal.pagememory.persistence.store.DeltaFilePageStoreIo;
import org.apache.ignite.internal.pagememory.persistence.store.FilePageStore;
import org.apache.ignite.internal.pagememory.persistence.store.FilePageStoreManager;
Expand Down Expand Up @@ -76,6 +77,9 @@ public class CheckpointManager {
/** File page store manager. */
private final FilePageStoreManager filePageStoreManager;

/** Delta file compactor. */
private final Compactor compactor;

/**
* Constructor.
*
Expand Down Expand Up @@ -133,6 +137,15 @@ public CheckpointManager(
pageSize
);

compactor = new Compactor(
Loggers.forClass(Compactor.class),
igniteInstanceName,
workerListener,
checkpointConfig.compactionThreads(),
filePageStoreManager,
pageSize
);

checkpointer = new Checkpointer(
Loggers.forClass(Checkpoint.class),
igniteInstanceName,
Expand All @@ -141,6 +154,7 @@ public CheckpointManager(
checkpointWorkflow,
checkpointPagesWriterFactory,
filePageStoreManager,
compactor,
checkpointConfig
);

Expand All @@ -162,6 +176,8 @@ public void start() {
checkpointer.start();

checkpointTimeoutLock.start();

compactor.start();
}

/**
Expand All @@ -171,7 +187,8 @@ public void stop() throws Exception {
IgniteUtils.closeAll(
checkpointTimeoutLock::stop,
checkpointer::stop,
checkpointWorkflow::stop
checkpointWorkflow::stop,
compactor::stop
);
}

Expand Down Expand Up @@ -285,4 +302,13 @@ static int[] pageIndexesForDeltaFilePageStore(CheckpointDirtyPagesView partition

return pageIndexes;
}

/**
* Adds the number of delta files to compact.
*
* @param count Number of delta files.
*/
public void addDeltaFileCountForCompaction(int count) {
compactor.addDeltaFiles(count);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ public class CheckpointPagesWriter implements Runnable {
/** Future which should be finished when all pages would be written. */
private final CompletableFuture<?> doneFut;

/** Some action which will be executed every time before page will be written. */
private final Runnable beforePageWrite;
/** Update heartbeat callback. */
private final Runnable updateHeartbeat;

/** Thread local with buffers for the checkpoint threads. Each buffer represent one page for durable memory. */
private final ThreadLocal<ByteBuffer> threadBuf;
Expand All @@ -103,7 +103,7 @@ public class CheckpointPagesWriter implements Runnable {
* @param writePageIds Queue of dirty page IDs to write.
* @param updatedPartitions Updated partitions.
* @param doneFut Done future.
* @param beforePageWrite Action to be performed before every page write.
* @param updateHeartbeat Update heartbeat callback.
* @param log Logger.
* @param threadBuf Thread local byte buffer.
* @param checkpointProgress Checkpoint progress.
Expand All @@ -118,7 +118,7 @@ public class CheckpointPagesWriter implements Runnable {
IgniteConcurrentMultiPairQueue<PersistentPageMemory, FullPageId> writePageIds,
ConcurrentMap<GroupPartitionId, LongAdder> updatedPartitions,
CompletableFuture<?> doneFut,
Runnable beforePageWrite,
Runnable updateHeartbeat,
ThreadLocal<ByteBuffer> threadBuf,
CheckpointProgressImpl checkpointProgress,
WriteDirtyPage pageWriter,
Expand All @@ -131,7 +131,7 @@ public class CheckpointPagesWriter implements Runnable {
this.writePageIds = writePageIds;
this.updatedPartitions = updatedPartitions;
this.doneFut = doneFut;
this.beforePageWrite = beforePageWrite;
this.updateHeartbeat = updateHeartbeat;
this.threadBuf = threadBuf;
this.checkpointProgress = checkpointProgress;
this.pageWriter = pageWriter;
Expand Down Expand Up @@ -183,7 +183,7 @@ private IgniteConcurrentMultiPairQueue<PersistentPageMemory, FullPageId> writePa
AtomicBoolean writeMetaPage = new AtomicBoolean();

while (!shutdownNow.getAsBoolean() && writePageIds.next(queueResult)) {
beforePageWrite.run();
updateHeartbeat.run();

FullPageId fullId = queueResult.getValue();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public class CheckpointPagesWriterFactory {
* @param dirtyPageIdQueue Checkpoint dirty page ID queue to write.
* @param updatedPartitions Updated partitions.
* @param doneWriteFut Write done future.
* @param beforePageWrite Before page write callback.
* @param updateHeartbeat Update heartbeat callback.
* @param checkpointProgress Current checkpoint data.
* @param shutdownNow Checker of stop operation.
*/
Expand All @@ -100,7 +100,7 @@ CheckpointPagesWriter build(
IgniteConcurrentMultiPairQueue<PersistentPageMemory, FullPageId> dirtyPageIdQueue,
ConcurrentMap<GroupPartitionId, LongAdder> updatedPartitions,
CompletableFuture<?> doneWriteFut,
Runnable beforePageWrite,
Runnable updateHeartbeat,
CheckpointProgressImpl checkpointProgress,
// TODO: IGNITE-16993 Consider a lock replacement
BooleanSupplier shutdownNow
Expand All @@ -111,7 +111,7 @@ CheckpointPagesWriter build(
dirtyPageIdQueue,
updatedPartitions,
doneWriteFut,
beforePageWrite,
updateHeartbeat,
threadBuf,
checkpointProgress,
dirtyPageWriter,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.LongAdder;
Expand All @@ -48,6 +47,7 @@
import org.apache.ignite.internal.pagememory.configuration.schema.PageMemoryCheckpointView;
import org.apache.ignite.internal.pagememory.persistence.GroupPartitionId;
import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
import org.apache.ignite.internal.pagememory.persistence.compaction.Compactor;
import org.apache.ignite.internal.pagememory.persistence.store.DeltaFilePageStoreIo;
import org.apache.ignite.internal.pagememory.persistence.store.FilePageStore;
import org.apache.ignite.internal.pagememory.persistence.store.FilePageStoreManager;
Expand Down Expand Up @@ -139,6 +139,9 @@ public class Checkpointer extends IgniteWorker {
/** File page store manager. */
private final FilePageStoreManager filePageStoreManager;

/** Delta file compactor. */
private final Compactor compactor;

/**
* Constructor.
*
Expand All @@ -149,6 +152,7 @@ public class Checkpointer extends IgniteWorker {
* @param checkpointWorkFlow Implementation of checkpoint.
* @param factory Page writer factory.
* @param filePageStoreManager File page store manager.
* @param compactor Delta file compactor.
* @param checkpointConfig Checkpoint configuration.
*/
Checkpointer(
Expand All @@ -159,6 +163,7 @@ public class Checkpointer extends IgniteWorker {
CheckpointWorkflow checkpointWorkFlow,
CheckpointPagesWriterFactory factory,
FilePageStoreManager filePageStoreManager,
Compactor compactor,
PageMemoryCheckpointConfiguration checkpointConfig
) {
super(log, igniteInstanceName, "checkpoint-thread", workerListener);
Expand All @@ -168,10 +173,11 @@ public class Checkpointer extends IgniteWorker {
this.checkpointWorkflow = checkpointWorkFlow;
this.checkpointPagesWriterFactory = factory;
this.filePageStoreManager = filePageStoreManager;
this.compactor = compactor;

scheduledCheckpointProgress = new CheckpointProgressImpl(MILLISECONDS.toNanos(nextCheckpointInterval()));

int checkpointWritePageThreads = checkpointConfig.threads().value();
int checkpointWritePageThreads = checkpointConfig.checkpointThreads().value();

if (checkpointWritePageThreads > 1) {
checkpointWritePagesPool = new ThreadPoolExecutor(
Expand Down Expand Up @@ -405,12 +411,7 @@ boolean writePages(
if (pageWritePool == null) {
write.run();
} else {
try {
pageWritePool.execute(write);
} catch (RejectedExecutionException ignore) {
// Run the task synchronously.
write.run();
}
pageWritePool.execute(write);
}
}

Expand All @@ -431,6 +432,8 @@ boolean writePages(

syncUpdatedPageStores(updatedPartitions);

compactor.addDeltaFiles(updatedPartitions.size());

if (shutdownNow.getAsBoolean()) {
currentCheckpointProgress.fail(new NodeStoppingException("Node is stopping."));

Expand Down
Loading