diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/configuration/schema/PageMemoryCheckpointConfigurationSchema.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/configuration/schema/PageMemoryCheckpointConfigurationSchema.java index b80a18a30185..2352b360a413 100644 --- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/configuration/schema/PageMemoryCheckpointConfigurationSchema.java +++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/configuration/schema/PageMemoryCheckpointConfigurationSchema.java @@ -39,7 +39,12 @@ public class PageMemoryCheckpointConfigurationSchema { /** Number of checkpoint threads. */ @Range(min = 1) @Value(hasDefault = true) - public int threads = 4; + public int checkpointThreads = 4; + + /** Number of threads to compact delta files. */ + @Range(min = 1) + @Value(hasDefault = true) + public int compactionThreads = 4; /** Timeout for checkpoint read lock acquisition in milliseconds. */ @Range(min = 0) diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java index d94782e492a9..57e764058297 100644 --- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java +++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java @@ -32,6 +32,7 @@ import org.apache.ignite.internal.pagememory.persistence.PartitionMetaManager; import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory; import org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointDirtyPages.CheckpointDirtyPagesView; +import org.apache.ignite.internal.pagememory.persistence.compaction.Compactor; import org.apache.ignite.internal.pagememory.persistence.store.DeltaFilePageStoreIo; import org.apache.ignite.internal.pagememory.persistence.store.FilePageStore; import org.apache.ignite.internal.pagememory.persistence.store.FilePageStoreManager; @@ -76,6 +77,9 @@ public class CheckpointManager { /** File page store manager. */ private final FilePageStoreManager filePageStoreManager; + /** Delta file compactor. */ + private final Compactor compactor; + /** * Constructor. * @@ -133,6 +137,15 @@ public CheckpointManager( pageSize ); + compactor = new Compactor( + Loggers.forClass(Compactor.class), + igniteInstanceName, + workerListener, + checkpointConfig.compactionThreads(), + filePageStoreManager, + pageSize + ); + checkpointer = new Checkpointer( Loggers.forClass(Checkpoint.class), igniteInstanceName, @@ -141,6 +154,7 @@ public CheckpointManager( checkpointWorkflow, checkpointPagesWriterFactory, filePageStoreManager, + compactor, checkpointConfig ); @@ -162,6 +176,8 @@ public void start() { checkpointer.start(); checkpointTimeoutLock.start(); + + compactor.start(); } /** @@ -171,7 +187,8 @@ public void stop() throws Exception { IgniteUtils.closeAll( checkpointTimeoutLock::stop, checkpointer::stop, - checkpointWorkflow::stop + checkpointWorkflow::stop, + compactor::stop ); } @@ -285,4 +302,13 @@ static int[] pageIndexesForDeltaFilePageStore(CheckpointDirtyPagesView partition return pageIndexes; } + + /** + * Adds the number of delta files to compact. + * + * @param count Number of delta files. + */ + public void addDeltaFileCountForCompaction(int count) { + compactor.addDeltaFiles(count); + } } diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java index 6da756a10324..29adbde0f13e 100644 --- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java +++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java @@ -75,8 +75,8 @@ public class CheckpointPagesWriter implements Runnable { /** Future which should be finished when all pages would be written. */ private final CompletableFuture doneFut; - /** Some action which will be executed every time before page will be written. */ - private final Runnable beforePageWrite; + /** Update heartbeat callback. */ + private final Runnable updateHeartbeat; /** Thread local with buffers for the checkpoint threads. Each buffer represent one page for durable memory. */ private final ThreadLocal threadBuf; @@ -103,7 +103,7 @@ public class CheckpointPagesWriter implements Runnable { * @param writePageIds Queue of dirty page IDs to write. * @param updatedPartitions Updated partitions. * @param doneFut Done future. - * @param beforePageWrite Action to be performed before every page write. + * @param updateHeartbeat Update heartbeat callback. * @param log Logger. * @param threadBuf Thread local byte buffer. * @param checkpointProgress Checkpoint progress. @@ -118,7 +118,7 @@ public class CheckpointPagesWriter implements Runnable { IgniteConcurrentMultiPairQueue writePageIds, ConcurrentMap updatedPartitions, CompletableFuture doneFut, - Runnable beforePageWrite, + Runnable updateHeartbeat, ThreadLocal threadBuf, CheckpointProgressImpl checkpointProgress, WriteDirtyPage pageWriter, @@ -131,7 +131,7 @@ public class CheckpointPagesWriter implements Runnable { this.writePageIds = writePageIds; this.updatedPartitions = updatedPartitions; this.doneFut = doneFut; - this.beforePageWrite = beforePageWrite; + this.updateHeartbeat = updateHeartbeat; this.threadBuf = threadBuf; this.checkpointProgress = checkpointProgress; this.pageWriter = pageWriter; @@ -183,7 +183,7 @@ private IgniteConcurrentMultiPairQueue writePa AtomicBoolean writeMetaPage = new AtomicBoolean(); while (!shutdownNow.getAsBoolean() && writePageIds.next(queueResult)) { - beforePageWrite.run(); + updateHeartbeat.run(); FullPageId fullId = queueResult.getValue(); diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriterFactory.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriterFactory.java index 831cbc0e49d7..ee8d15b20675 100644 --- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriterFactory.java +++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriterFactory.java @@ -91,7 +91,7 @@ public class CheckpointPagesWriterFactory { * @param dirtyPageIdQueue Checkpoint dirty page ID queue to write. * @param updatedPartitions Updated partitions. * @param doneWriteFut Write done future. - * @param beforePageWrite Before page write callback. + * @param updateHeartbeat Update heartbeat callback. * @param checkpointProgress Current checkpoint data. * @param shutdownNow Checker of stop operation. */ @@ -100,7 +100,7 @@ CheckpointPagesWriter build( IgniteConcurrentMultiPairQueue dirtyPageIdQueue, ConcurrentMap updatedPartitions, CompletableFuture doneWriteFut, - Runnable beforePageWrite, + Runnable updateHeartbeat, CheckpointProgressImpl checkpointProgress, // TODO: IGNITE-16993 Consider a lock replacement BooleanSupplier shutdownNow @@ -111,7 +111,7 @@ CheckpointPagesWriter build( dirtyPageIdQueue, updatedPartitions, doneWriteFut, - beforePageWrite, + updateHeartbeat, threadBuf, checkpointProgress, dirtyPageWriter, diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java index f8569013a990..1725816ddb1c 100644 --- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java +++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java @@ -36,7 +36,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.LongAdder; @@ -48,6 +47,7 @@ import org.apache.ignite.internal.pagememory.configuration.schema.PageMemoryCheckpointView; import org.apache.ignite.internal.pagememory.persistence.GroupPartitionId; import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory; +import org.apache.ignite.internal.pagememory.persistence.compaction.Compactor; import org.apache.ignite.internal.pagememory.persistence.store.DeltaFilePageStoreIo; import org.apache.ignite.internal.pagememory.persistence.store.FilePageStore; import org.apache.ignite.internal.pagememory.persistence.store.FilePageStoreManager; @@ -139,6 +139,9 @@ public class Checkpointer extends IgniteWorker { /** File page store manager. */ private final FilePageStoreManager filePageStoreManager; + /** Delta file compactor. */ + private final Compactor compactor; + /** * Constructor. * @@ -149,6 +152,7 @@ public class Checkpointer extends IgniteWorker { * @param checkpointWorkFlow Implementation of checkpoint. * @param factory Page writer factory. * @param filePageStoreManager File page store manager. + * @param compactor Delta file compactor. * @param checkpointConfig Checkpoint configuration. */ Checkpointer( @@ -159,6 +163,7 @@ public class Checkpointer extends IgniteWorker { CheckpointWorkflow checkpointWorkFlow, CheckpointPagesWriterFactory factory, FilePageStoreManager filePageStoreManager, + Compactor compactor, PageMemoryCheckpointConfiguration checkpointConfig ) { super(log, igniteInstanceName, "checkpoint-thread", workerListener); @@ -168,10 +173,11 @@ public class Checkpointer extends IgniteWorker { this.checkpointWorkflow = checkpointWorkFlow; this.checkpointPagesWriterFactory = factory; this.filePageStoreManager = filePageStoreManager; + this.compactor = compactor; scheduledCheckpointProgress = new CheckpointProgressImpl(MILLISECONDS.toNanos(nextCheckpointInterval())); - int checkpointWritePageThreads = checkpointConfig.threads().value(); + int checkpointWritePageThreads = checkpointConfig.checkpointThreads().value(); if (checkpointWritePageThreads > 1) { checkpointWritePagesPool = new ThreadPoolExecutor( @@ -405,12 +411,7 @@ boolean writePages( if (pageWritePool == null) { write.run(); } else { - try { - pageWritePool.execute(write); - } catch (RejectedExecutionException ignore) { - // Run the task synchronously. - write.run(); - } + pageWritePool.execute(write); } } @@ -431,6 +432,8 @@ boolean writePages( syncUpdatedPageStores(updatedPartitions); + compactor.addDeltaFiles(updatedPartitions.size()); + if (shutdownNow.getAsBoolean()) { currentCheckpointProgress.fail(new NodeStoppingException("Node is stopping.")); diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java new file mode 100644 index 000000000000..bf9f285ee8f7 --- /dev/null +++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java @@ -0,0 +1,370 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.pagememory.persistence.compaction; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.stream.Collectors.toCollection; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.List; +import java.util.Objects; +import java.util.Queue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ignite.configuration.ConfigurationValue; +import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.pagememory.io.PageIo; +import org.apache.ignite.internal.pagememory.persistence.store.DeltaFilePageStoreIo; +import org.apache.ignite.internal.pagememory.persistence.store.FilePageStore; +import org.apache.ignite.internal.pagememory.persistence.store.FilePageStoreManager; +import org.apache.ignite.internal.thread.IgniteThread; +import org.apache.ignite.internal.thread.NamedThreadFactory; +import org.apache.ignite.internal.util.IgniteUtils; +import org.apache.ignite.internal.util.worker.IgniteWorker; +import org.apache.ignite.internal.util.worker.IgniteWorkerListener; +import org.apache.ignite.lang.IgniteBiTuple; +import org.apache.ignite.lang.IgniteInternalException; +import org.jetbrains.annotations.Nullable; + +/** + * Entity to compact delta files. + * + *

To start compacting delta files, you need to notify about the appearance of {@link #addDeltaFiles(int) delta files ready for + * compaction}. Then all delta files {@link FilePageStore#getDeltaFileToCompaction() ready for compaction} will be collected and merged with + * their {@link FilePageStore file page stores} until all delta files are compacted. + * + *

Delta file compaction process consists of: + *

    + *
  • Copying pages from a delta file to a partition file.
  • + *
  • Fsync of the partition file.
  • + *
  • Remove delta file from {@link FilePageStore} and file system.
  • + *
+ */ +public class Compactor extends IgniteWorker { + private final Object mux = new Object(); + + private final @Nullable ThreadPoolExecutor threadPoolExecutor; + + private final AtomicInteger deltaFileCount = new AtomicInteger(); + + private final FilePageStoreManager filePageStoreManager; + + /** Thread local with buffers for the compaction threads. */ + private final ThreadLocal threadBuf; + + /** + * Creates new ignite worker with given parameters. + * + * @param log Logger. + * @param igniteInstanceName Name of the Ignite instance this runnable is used in. + * @param listener Listener for life-cycle events. + * @param threads Number of compaction threads. + * @param filePageStoreManager File page store manager. + * @param pageSize Page size in bytes. + */ + public Compactor( + IgniteLogger log, + String igniteInstanceName, + @Nullable IgniteWorkerListener listener, + ConfigurationValue threads, + FilePageStoreManager filePageStoreManager, + int pageSize + ) { + super(log, igniteInstanceName, "compaction-thread", listener); + + this.filePageStoreManager = filePageStoreManager; + + threadBuf = ThreadLocal.withInitial(() -> { + ByteBuffer tmpWriteBuf = ByteBuffer.allocateDirect(pageSize); + + tmpWriteBuf.order(ByteOrder.nativeOrder()); + + return tmpWriteBuf; + }); + + int threadCount = threads.value(); + + if (threadCount > 1) { + threadPoolExecutor = new ThreadPoolExecutor( + threadCount, + threadCount, + 30_000, + MILLISECONDS, + new LinkedBlockingQueue<>(), + new NamedThreadFactory("compaction-runner-io", log) + ); + } else { + threadPoolExecutor = null; + } + } + + /** {@inheritDoc} */ + @Override + protected void body() throws InterruptedException { + try { + while (!isCancelled()) { + waitDeltaFiles(); + + if (isCancelled()) { + log.info("Skipping the delta file compaction because the node is stopping"); + + return; + } + + doCompaction(); + } + } catch (Throwable t) { + // TODO: IGNITE-16899 By analogy with 2.0, we need to handle the exception (err) by the FailureProcessor + + throw new IgniteInternalException(t); + } + } + + /** + * Waiting for delta files. + */ + void waitDeltaFiles() { + try { + synchronized (mux) { + while (deltaFileCount.get() == 0 && !isCancelled()) { + blockingSectionBegin(); + + try { + mux.wait(); + } finally { + blockingSectionEnd(); + } + } + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + + isCancelled.set(true); + } + } + + /** + * Adds the number of delta files to compact. + * + * @param count Number of delta files. + */ + public void addDeltaFiles(int count) { + assert count >= 0; + + if (count > 0) { + deltaFileCount.addAndGet(count); + + synchronized (mux) { + mux.notifyAll(); + } + } + } + + /** + * Merges delta files with partition files. + */ + void doCompaction() { + // Let's collect one delta file for each partition. + Queue> queue = filePageStoreManager.allPageStores().stream() + .flatMap(List::stream) + .map(filePageStore -> { + DeltaFilePageStoreIo deltaFileToCompaction = filePageStore.getDeltaFileToCompaction(); + + return deltaFileToCompaction == null ? null : new IgniteBiTuple<>(filePageStore, deltaFileToCompaction); + }) + .filter(Objects::nonNull) + .collect(toCollection(ConcurrentLinkedQueue::new)); + + assert !queue.isEmpty(); + + updateHeartbeat(); + + int threads = threadPoolExecutor == null ? 1 : threadPoolExecutor.getMaximumPoolSize(); + + CompletableFuture[] futures = new CompletableFuture[threads]; + + for (int i = 0; i < threads; i++) { + CompletableFuture future = futures[i] = new CompletableFuture<>(); + + Runnable merger = () -> { + IgniteBiTuple toMerge; + + try { + while ((toMerge = queue.poll()) != null) { + mergeDeltaFileToMainFile(toMerge.get1(), toMerge.get2()); + } + } catch (Throwable ex) { + future.completeExceptionally(ex); + } + + future.complete(null); + }; + + if (isCancelled()) { + return; + } + + if (threadPoolExecutor == null) { + merger.run(); + } else { + threadPoolExecutor.execute(merger); + } + } + + updateHeartbeat(); + + // Wait and check for errors. + CompletableFuture.allOf(futures).join(); + } + + /** + * Starts the compacter. + */ + public void start() { + if (runner() != null) { + return; + } + + assert runner() == null : "Compacter is running"; + + new IgniteThread(this).start(); + } + + /** + * Stops the compacter. + */ + public void stop() throws Exception { + cancel(); + + boolean interrupt = false; + + while (true) { + try { + join(); + + break; + } catch (InterruptedException e) { + interrupt = true; + } + } + + if (interrupt) { + Thread.currentThread().interrupt(); + } + + if (threadPoolExecutor != null) { + IgniteUtils.shutdownAndAwaitTermination(threadPoolExecutor, 2, TimeUnit.MINUTES); + } + } + + /** {@inheritDoc} */ + @Override + public void cancel() { + if (log.isDebugEnabled()) { + log.debug("Cancelling grid runnable: " + this); + } + + // Do not interrupt runner thread. + isCancelled.set(true); + + synchronized (mux) { + mux.notifyAll(); + } + } + + /** + * Merges the main file page store with the delta file page store. + * + *

Steps: + *

    + *
  • Copy pages from delta file page store to file page store.
  • + *
  • Fsync the file page store.
  • + *
  • Removing the delta file page store from a file page store.
  • + *
+ * + * @param filePageStore File page store. + * @param deltaFilePageStore Delta file page store. + * @throws Throwable If failed. + */ + void mergeDeltaFileToMainFile( + FilePageStore filePageStore, + DeltaFilePageStoreIo deltaFilePageStore + ) throws Throwable { + // Copy pages deltaFilePageStore -> filePageStore. + ByteBuffer buffer = threadBuf.get(); + + for (long pageIndex : deltaFilePageStore.pageIndexes()) { + updateHeartbeat(); + + if (isCancelled()) { + return; + } + + long pageOffset = deltaFilePageStore.pageOffset(pageIndex); + + // pageIndex instead of pageId, only for debugging in case of errors + // since we do not know the pageId until we read it from the pageOffset. + boolean read = deltaFilePageStore.readWithMergedToFilePageStoreCheck(pageIndex, pageOffset, buffer.rewind(), false); + + assert read : deltaFilePageStore.filePath(); + + long pageId = PageIo.getPageId(buffer.rewind()); + + assert pageId != 0 : deltaFilePageStore.filePath(); + + updateHeartbeat(); + + if (isCancelled()) { + return; + } + + filePageStore.write(pageId, buffer.rewind(), true); + } + + // Fsync the file page store. + updateHeartbeat(); + + if (isCancelled()) { + return; + } + + filePageStore.sync(); + + // Removing the delta file page store from a file page store. + updateHeartbeat(); + + if (isCancelled()) { + return; + } + + boolean removed = filePageStore.removeDeltaFile(deltaFilePageStore); + + assert removed : filePageStore.filePath(); + + deltaFilePageStore.markMergedToFilePageStore(); + + deltaFilePageStore.stop(true); + + deltaFileCount.decrementAndGet(); + } +} diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/AbstractFilePageStoreIo.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/AbstractFilePageStoreIo.java index 9c13199387e8..7043f90abf3b 100644 --- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/AbstractFilePageStoreIo.java +++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/AbstractFilePageStoreIo.java @@ -64,7 +64,11 @@ public abstract class AbstractFilePageStoreIo implements Closeable { /** Initialized file page store IO. */ private volatile boolean initialized; - /** Caches the existence state of file. After it is initialized, it will be not {@code null} during lifecycle. */ + /** + * Caches the existence state of file. After it is initialized, it will be not {@code null} during lifecycle. + * + *

Guarded by {@link #readWriteLock}. + */ private @Nullable Boolean fileExists; /** @@ -140,7 +144,7 @@ public void close() throws IOException { * @param keepCrc By default, reading zeroes CRC which was on page store, but you can keep it in {@code pageBuf} if set {@code true}. * @throws IgniteInternalCheckedException If reading failed (IO error occurred). */ - public void read(long pageId, long pageOff, ByteBuffer pageBuf, boolean keepCrc) throws IgniteInternalCheckedException { + protected void read(long pageId, long pageOff, ByteBuffer pageBuf, boolean keepCrc) throws IgniteInternalCheckedException { read0(pageId, pageOff, pageBuf, !skipCrc, keepCrc); } @@ -513,8 +517,6 @@ private void read0( if (keepCrc) { PageIo.setCrc(pageBuf, savedCrc32); } - - return; } catch (IOException e) { throw new IgniteInternalCheckedException("Failed to read page [file=" + filePath + ", pageId=" + pageId + "]", e); } diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/DeltaFilePageStoreIo.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/DeltaFilePageStoreIo.java index 2fd974a82b1c..7d5256a16944 100644 --- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/DeltaFilePageStoreIo.java +++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/DeltaFilePageStoreIo.java @@ -31,6 +31,8 @@ import java.nio.file.Path; import org.apache.ignite.internal.fileio.FileIo; import org.apache.ignite.internal.fileio.FileIoFactory; +import org.apache.ignite.internal.util.IgniteSpinBusyLock; +import org.apache.ignite.lang.IgniteInternalCheckedException; /** * Implementation of the class for working with the delta file page storage IO. @@ -38,6 +40,9 @@ public class DeltaFilePageStoreIo extends AbstractFilePageStoreIo { private final DeltaFilePageStoreIoHeader header; + /** Lock to prevent reads after merging with a file page store. */ + private final IgniteSpinBusyLock mergedBusyLock = new IgniteSpinBusyLock(); + /** * Constructor. * @@ -95,7 +100,16 @@ public void checkHeader(FileIo fileIo) throws IOException { */ @Override public long pageOffset(long pageId) { - int searchResult = binarySearch(header.pageIndexes(), pageIndex(pageId)); + return pageOffset(pageIndex(pageId)); + } + + /** + * Returns page offset within the store file, {@code -1} if page not found in delta file. + * + * @param pageIdx Page index. + */ + public long pageOffset(int pageIdx) { + int searchResult = binarySearch(header.pageIndexes(), pageIdx); if (searchResult < 0) { return -1; @@ -104,10 +118,56 @@ public long pageOffset(long pageId) { return (long) searchResult * pageSize() + headerSize(); } + /** + * Reads a page. + * + * @param pageId Page ID. + * @param pageBuf Page buffer to read into. + * @param keepCrc By default, reading zeroes CRC which was on page store, but you can keep it in {@code pageBuf} if set {@code true}. + * @return {@code True} if the page was successfully read, otherwise the delta file page store is {@link #markMergedToFilePageStore() + * merged} with the file page store (must be read from file page store). + * @throws IgniteInternalCheckedException If reading failed (IO error occurred). + */ + public boolean readWithMergedToFilePageStoreCheck( + long pageId, + long pageOff, + ByteBuffer pageBuf, + boolean keepCrc + ) throws IgniteInternalCheckedException { + if (!mergedBusyLock.enterBusy()) { + return false; + } + + try { + super.read(pageId, pageOff, pageBuf, keepCrc); + + return true; + } finally { + mergedBusyLock.leaveBusy(); + } + } + /** * Returns the index of the delta file page store. */ public int fileIndex() { return header.index(); } + + /** + * Marks that the delta file page store has been merged with the file page store. + * + *

It waits for all current {@link #readWithMergedToFilePageStoreCheck(long, long, ByteBuffer, boolean) readings} to end, and + * subsequent ones will return {@code false} (will need to read from the file page store not from delta file page store). + */ + public void markMergedToFilePageStore() { + mergedBusyLock.block(); + } + + /** + * Returns page indexes of the delta file page store. + */ + public int[] pageIndexes() { + return header.pageIndexes(); + } } diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStore.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStore.java index 9ff231861120..5a3b3a2e1825 100644 --- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStore.java +++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStore.java @@ -26,6 +26,7 @@ import java.nio.file.Path; import java.util.Arrays; import java.util.Comparator; +import java.util.Iterator; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CopyOnWriteArrayList; @@ -168,9 +169,9 @@ public void readWithoutPageIdCheck(long pageId, ByteBuffer pageBuf, boolean keep long pageOff = deltaFilePageStoreIo.pageOffset(pageId); if (pageOff >= 0) { - deltaFilePageStoreIo.read(pageId, pageOff, pageBuf, keepCrc); - - return; + if (deltaFilePageStoreIo.readWithMergedToFilePageStoreCheck(pageId, pageOff, pageBuf, keepCrc)) { + return; + } } } @@ -182,17 +183,7 @@ public void readWithoutPageIdCheck(long pageId, ByteBuffer pageBuf, boolean keep public void read(long pageId, ByteBuffer pageBuf, boolean keepCrc) throws IgniteInternalCheckedException { assert pageIndex(pageId) <= pageCount : "pageIdx=" + pageIndex(pageId) + ", pageCount=" + pageCount; - for (DeltaFilePageStoreIo deltaFilePageStoreIo : deltaFilePageStoreIos) { - long pageOff = deltaFilePageStoreIo.pageOffset(pageId); - - if (pageOff >= 0) { - deltaFilePageStoreIo.read(pageId, pageOff, pageBuf, keepCrc); - - return; - } - } - - filePageStoreIo.read(pageId, filePageStoreIo.pageOffset(pageId), pageBuf, keepCrc); + readWithoutPageIdCheck(pageId, pageBuf, keepCrc); } /** {@inheritDoc} */ @@ -344,4 +335,44 @@ public void completeNewDeltaFile() { public int deltaFileCount() { return deltaFilePageStoreIos.size(); } + + /** + * Returns the delta file to compaction (oldest). + * + *

Thread safe. + */ + public @Nullable DeltaFilePageStoreIo getDeltaFileToCompaction() { + // Snapshot of delta files. + Iterator iterator = deltaFilePageStoreIos.iterator(); + + // Last one is the oldest. + DeltaFilePageStoreIo last = null; + + int count = 0; + + while (iterator.hasNext()) { + last = iterator.next(); + + count++; + } + + // If last is just created, then it cannot be compacted yet. + if (count == 1 && newDeltaFilePageStoreIoFuture != null) { + last = null; + } + + return last; + } + + /** + * Deletes delta file. + * + *

Thread safe. + * + * @param deltaFilePageStoreIo Delta file to be deleted. + * @return {@code True} if the delta file being removed was present. + */ + public boolean removeDeltaFile(DeltaFilePageStoreIo deltaFilePageStoreIo) { + return deltaFilePageStoreIos.remove(deltaFilePageStoreIo); + } } diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreIo.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreIo.java index 0d340c9e9b59..e14d397bf533 100644 --- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreIo.java +++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreIo.java @@ -28,6 +28,7 @@ import org.apache.ignite.internal.fileio.FileIo; import org.apache.ignite.internal.fileio.FileIoFactory; import org.apache.ignite.internal.pagememory.util.PageIdUtils; +import org.apache.ignite.lang.IgniteInternalCheckedException; /** * Implementation of the class for working with the file page file storage IO. @@ -83,6 +84,12 @@ public void checkHeader(FileIo fileIo) throws IOException { checkFilePageSize(this.header.pageSize(), header.pageSize()); } + /** {@inheritDoc} */ + @Override + public void read(long pageId, long pageOff, ByteBuffer pageBuf, boolean keepCrc) throws IgniteInternalCheckedException { + super.read(pageId, pageOff, pageBuf, keepCrc); + } + /** {@inheritDoc} */ @Override public long pageOffset(long pageId) { diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreManager.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreManager.java index 64d67fd2829b..70a15608c153 100644 --- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreManager.java +++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreManager.java @@ -21,6 +21,8 @@ import static java.util.Collections.unmodifiableList; import static java.util.stream.Collectors.toList; import static org.apache.ignite.internal.pagememory.PageIdAllocator.MAX_PARTITION_ID; +import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId; +import static org.apache.ignite.internal.pagememory.util.PageIdUtils.partitionId; import static org.apache.ignite.internal.util.GridUnsafe.allocateBuffer; import static org.apache.ignite.internal.util.GridUnsafe.freeBuffer; import static org.apache.ignite.internal.util.IgniteUtils.closeAll; @@ -82,9 +84,6 @@ public class FilePageStoreManager implements PageReadWriteManager { /** Page size in bytes. */ private final int pageSize; - /** Page read write manager. */ - private final PageReadWriteManager pageReadWriteManager = new PageReadWriteManagerImpl(this); - /** * Executor to disallow running code that modifies data in {@link #groupPageStores} concurrently with cleanup of file page store. */ @@ -180,7 +179,15 @@ public void stop() throws Exception { /** {@inheritDoc} */ @Override public void read(int grpId, long pageId, ByteBuffer pageBuf, boolean keepCrc) throws IgniteInternalCheckedException { - pageReadWriteManager.read(grpId, pageId, pageBuf, keepCrc); + FilePageStore pageStore = getStore(grpId, partitionId(pageId)); + + try { + pageStore.read(pageId, pageBuf, keepCrc); + } catch (IgniteInternalCheckedException e) { + // TODO: IGNITE-16899 By analogy with 2.0, fail a node + + throw e; + } } /** {@inheritDoc} */ @@ -191,13 +198,35 @@ public PageStore write( ByteBuffer pageBuf, boolean calculateCrc ) throws IgniteInternalCheckedException { - return pageReadWriteManager.write(grpId, pageId, pageBuf, calculateCrc); + FilePageStore pageStore = getStore(grpId, partitionId(pageId)); + + try { + pageStore.write(pageId, pageBuf, calculateCrc); + } catch (IgniteInternalCheckedException e) { + // TODO: IGNITE-16899 By analogy with 2.0, fail a node + + throw e; + } + + return pageStore; } /** {@inheritDoc} */ @Override public long allocatePage(int grpId, int partId, byte flags) throws IgniteInternalCheckedException { - return pageReadWriteManager.allocatePage(grpId, partId, flags); + assert partId >= 0 && partId <= MAX_PARTITION_ID : partId; + + FilePageStore pageStore = getStore(grpId, partId); + + try { + int pageIdx = pageStore.allocatePage(); + + return pageId(partId, flags, pageIdx); + } catch (IgniteInternalCheckedException e) { + // TODO: IGNITE-16899 By analogy with 2.0, fail a node + + throw e; + } } /** @@ -239,6 +268,13 @@ public void initialize(String tableName, int tableId, int partitions) throws Ign return groupPageStores.get(grpId); } + /** + * Returns all page stores of all groups. + */ + public Collection> allPageStores() { + return groupPageStores.allPageStores(); + } + /** * Returns partition file page store for the corresponding parameters. * diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/PageReadWriteManagerImpl.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/PageReadWriteManagerImpl.java deleted file mode 100644 index 94e47164f66f..000000000000 --- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/PageReadWriteManagerImpl.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.pagememory.persistence.store; - -import static org.apache.ignite.internal.pagememory.PageIdAllocator.MAX_PARTITION_ID; -import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId; -import static org.apache.ignite.internal.pagememory.util.PageIdUtils.partitionId; - -import java.nio.ByteBuffer; -import org.apache.ignite.internal.tostring.IgniteToStringExclude; -import org.apache.ignite.lang.IgniteInternalCheckedException; - -/** - * {@link org.apache.ignite.internal.pagememory.persistence.PageReadWriteManager} implementation. - */ -class PageReadWriteManagerImpl implements org.apache.ignite.internal.pagememory.persistence.PageReadWriteManager { - @IgniteToStringExclude - protected final FilePageStoreManager filePageStoreManager; - - /** - * Constructor. - * - * @param filePageStoreManager File page store manager. - */ - public PageReadWriteManagerImpl(FilePageStoreManager filePageStoreManager) { - this.filePageStoreManager = filePageStoreManager; - } - - /** {@inheritDoc} */ - @Override - public void read(int grpId, long pageId, ByteBuffer pageBuf, boolean keepCrc) throws IgniteInternalCheckedException { - FilePageStore pageStore = filePageStoreManager.getStore(grpId, partitionId(pageId)); - - try { - pageStore.read(pageId, pageBuf, keepCrc); - } catch (IgniteInternalCheckedException e) { - // TODO: IGNITE-16899 By analogy with 2.0, fail a node - - throw e; - } - } - - /** {@inheritDoc} */ - @Override - public PageStore write( - int grpId, - long pageId, - ByteBuffer pageBuf, - boolean calculateCrc - ) throws IgniteInternalCheckedException { - FilePageStore pageStore = filePageStoreManager.getStore(grpId, partitionId(pageId)); - - try { - pageStore.write(pageId, pageBuf, calculateCrc); - } catch (IgniteInternalCheckedException e) { - // TODO: IGNITE-16899 By analogy with 2.0, fail a node - - throw e; - } - - return pageStore; - } - - /** {@inheritDoc} */ - @Override - public long allocatePage(int grpId, int partId, byte flags) throws IgniteInternalCheckedException { - assert partId >= 0 && partId <= MAX_PARTITION_ID : partId; - - FilePageStore pageStore = filePageStoreManager.getStore(grpId, partId); - - try { - int pageIdx = pageStore.allocatePage(); - - return pageId(partId, flags, pageIdx); - } catch (IgniteInternalCheckedException e) { - // TODO: IGNITE-16899 By analogy with 2.0, fail a node - - throw e; - } - } -} diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/PageStore.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/PageStore.java index 9866ec28ae67..66d778a6a3f2 100644 --- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/PageStore.java +++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/PageStore.java @@ -51,8 +51,7 @@ public interface PageStore extends Closeable { * * @param pageId Page ID. * @param pageBuf Page buffer to read into. - * @param keepCrc By default, reading zeroes CRC which was on page store, but you can keep it in {@code pageBuf} if set {@code - * keepCrc}. + * @param keepCrc By default, reading zeroes CRC which was on page store, but you can keep it in {@code pageBuf} if set {@code true}. * @throws IgniteInternalCheckedException If reading failed (IO error occurred). */ void read(long pageId, ByteBuffer pageBuf, boolean keepCrc) throws IgniteInternalCheckedException; diff --git a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointerTest.java b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointerTest.java index d4ad1f6b4805..3e5f2b05f1a7 100644 --- a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointerTest.java +++ b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointerTest.java @@ -46,6 +46,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; @@ -71,6 +72,7 @@ import org.apache.ignite.internal.pagememory.persistence.PartitionMetaManager; import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory; import org.apache.ignite.internal.pagememory.persistence.WriteDirtyPage; +import org.apache.ignite.internal.pagememory.persistence.compaction.Compactor; import org.apache.ignite.internal.pagememory.persistence.store.DeltaFilePageStoreIo; import org.apache.ignite.internal.pagememory.persistence.store.FilePageStore; import org.apache.ignite.internal.pagememory.persistence.store.FilePageStoreManager; @@ -91,7 +93,7 @@ public class CheckpointerTest { private final IgniteLogger log = Loggers.forClass(CheckpointerTest.class); - @InjectConfiguration("mock : {threads=1, frequency=1000, frequencyDeviation=0}") + @InjectConfiguration("mock : {checkpointThreads=1, frequency=1000, frequencyDeviation=0}") private PageMemoryCheckpointConfiguration checkpointConfig; @BeforeAll @@ -116,6 +118,7 @@ void testStartAndStop() throws Exception { createCheckpointWorkflow(EMPTY), createCheckpointPagesWriterFactory(mock(PartitionMetaManager.class)), mock(FilePageStoreManager.class), + mock(Compactor.class), checkpointConfig ); @@ -148,6 +151,7 @@ void testScheduleCheckpoint() { mock(CheckpointWorkflow.class), mock(CheckpointPagesWriterFactory.class), mock(FilePageStoreManager.class), + mock(Compactor.class), checkpointConfig )); @@ -247,6 +251,7 @@ void testWaitCheckpointEvent() throws Exception { mock(CheckpointWorkflow.class), mock(CheckpointPagesWriterFactory.class), mock(FilePageStoreManager.class), + mock(Compactor.class), checkpointConfig ); @@ -275,6 +280,7 @@ void testCheckpointBody() throws Exception { createCheckpointWorkflow(EMPTY), createCheckpointPagesWriterFactory(mock(PartitionMetaManager.class)), mock(FilePageStoreManager.class), + mock(Compactor.class), checkpointConfig )); @@ -352,6 +358,8 @@ void testDoCheckpoint() throws Exception { when(filePageStore.getNewDeltaFile()).thenReturn(completedFuture(mock(DeltaFilePageStoreIo.class))); + Compactor compactor = mock(Compactor.class); + Checkpointer checkpointer = spy(new Checkpointer( log, "test", @@ -360,6 +368,7 @@ void testDoCheckpoint() throws Exception { createCheckpointWorkflow(dirtyPages), createCheckpointPagesWriterFactory(partitionMetaManager), createFilePageStoreManager(Map.of(new GroupPartitionId(0, 0), filePageStore)), + compactor, checkpointConfig )); @@ -367,6 +376,7 @@ void testDoCheckpoint() throws Exception { verify(dirtyPages, times(1)).toDirtyPageIdQueue(); verify(checkpointer, times(1)).startCheckpointProgress(); + verify(compactor, times(1)).addDeltaFiles(eq(1)); assertEquals(checkpointer.lastCheckpointProgress().currentCheckpointPagesCount(), 3); } @@ -381,6 +391,7 @@ void testNextCheckpointInterval() throws Exception { mock(CheckpointWorkflow.class), mock(CheckpointPagesWriterFactory.class), mock(FilePageStoreManager.class), + mock(Compactor.class), checkpointConfig ); diff --git a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/compaction/CompactorTest.java b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/compaction/CompactorTest.java new file mode 100644 index 000000000000..d065efcda943 --- /dev/null +++ b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/compaction/CompactorTest.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.pagememory.persistence.compaction; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync; +import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition; +import static org.apache.ignite.internal.util.GridUnsafe.bufferAddress; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeoutException; +import org.apache.ignite.configuration.ConfigurationValue; +import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.logger.Loggers; +import org.apache.ignite.internal.pagememory.io.PageIo; +import org.apache.ignite.internal.pagememory.persistence.store.DeltaFilePageStoreIo; +import org.apache.ignite.internal.pagememory.persistence.store.FilePageStore; +import org.apache.ignite.internal.pagememory.persistence.store.FilePageStoreManager; +import org.junit.jupiter.api.Test; + +/** + * For {@link Compactor} testing. + */ +public class CompactorTest { + private static final int PAGE_SIZE = 1024; + + private final IgniteLogger log = Loggers.forClass(CompactorTest.class); + + @Test + void testStartAndStop() throws Exception { + Compactor compactor = new Compactor(log, "test", null, threadsConfig(1), mock(FilePageStoreManager.class), PAGE_SIZE); + + compactor.start(); + + assertNull(compactor.runner()); + + assertFalse(compactor.isCancelled()); + assertFalse(compactor.isDone()); + assertFalse(Thread.currentThread().isInterrupted()); + + compactor.start(); + + assertTrue(waitForCondition(() -> compactor.runner() != null, 10, 1_000)); + + compactor.stop(); + + assertTrue(waitForCondition(() -> compactor.runner() == null, 10, 1_000)); + + assertTrue(compactor.isCancelled()); + assertTrue(compactor.isDone()); + assertFalse(Thread.currentThread().isInterrupted()); + } + + @Test + void testMergeDeltaFileToMainFile() throws Throwable { + Compactor compactor = new Compactor(log, "test", null, threadsConfig(1), mock(FilePageStoreManager.class), PAGE_SIZE); + + FilePageStore filePageStore = mock(FilePageStore.class); + DeltaFilePageStoreIo deltaFilePageStoreIo = mock(DeltaFilePageStoreIo.class); + + when(filePageStore.removeDeltaFile(eq(deltaFilePageStoreIo))).thenReturn(true); + + when(deltaFilePageStoreIo.pageIndexes()).thenReturn(new int[]{0}); + + when(deltaFilePageStoreIo.readWithMergedToFilePageStoreCheck(anyLong(), anyLong(), any(ByteBuffer.class), anyBoolean())) + .then(answer -> { + ByteBuffer buffer = answer.getArgument(2); + + PageIo.setPageId(bufferAddress(buffer), 1); + + return true; + }); + + compactor.mergeDeltaFileToMainFile(filePageStore, deltaFilePageStoreIo); + + verify(deltaFilePageStoreIo, times(1)).readWithMergedToFilePageStoreCheck(eq(0L), eq(0L), any(ByteBuffer.class), anyBoolean()); + verify(filePageStore, times(1)).write(eq(1L), any(ByteBuffer.class), anyBoolean()); + + verify(filePageStore, times(1)).sync(); + verify(filePageStore, times(1)).removeDeltaFile(eq(deltaFilePageStoreIo)); + + verify(deltaFilePageStoreIo, times(1)).markMergedToFilePageStore(); + verify(deltaFilePageStoreIo, times(1)).stop(eq(true)); + } + + @Test + void testDoCompaction() throws Throwable { + FilePageStore filePageStore = mock(FilePageStore.class); + + DeltaFilePageStoreIo deltaFilePageStoreIo = mock(DeltaFilePageStoreIo.class); + + when(filePageStore.getDeltaFileToCompaction()).thenReturn(deltaFilePageStoreIo); + + FilePageStoreManager filePageStoreManager = mock(FilePageStoreManager.class); + + when(filePageStoreManager.allPageStores()).thenReturn(List.of(List.of(filePageStore))); + + Compactor compactor = spy(new Compactor(log, "test", null, threadsConfig(1), filePageStoreManager, PAGE_SIZE)); + + doAnswer(answer -> { + assertSame(filePageStore, answer.getArgument(0)); + assertSame(deltaFilePageStoreIo, answer.getArgument(1)); + + return null; + }) + .when(compactor) + .mergeDeltaFileToMainFile(any(FilePageStore.class), any(DeltaFilePageStoreIo.class)); + + compactor.doCompaction(); + + verify(filePageStore, times(1)).getDeltaFileToCompaction(); + + verify(compactor, times(1)).mergeDeltaFileToMainFile(any(FilePageStore.class), any(DeltaFilePageStoreIo.class)); + } + + @Test + void testBody() throws Exception { + Compactor compactor = spy(new Compactor(log, "test", null, threadsConfig(1), mock(FilePageStoreManager.class), PAGE_SIZE)); + + doNothing().when(compactor).waitDeltaFiles(); + + doAnswer(answer -> { + compactor.cancel(); + + return null; + }).when(compactor).doCompaction(); + + compactor.body(); + + verify(compactor, times(3)).isCancelled(); + verify(compactor, times(1)).waitDeltaFiles(); + verify(compactor, times(1)).doCompaction(); + } + + @Test + void testWaitDeltaFiles() throws Exception { + Compactor compactor = spy(new Compactor(log, "test", null, threadsConfig(1), mock(FilePageStoreManager.class), PAGE_SIZE)); + + CompletableFuture waitDeltaFilesFuture = runAsync(compactor::waitDeltaFiles); + + assertThrows(TimeoutException.class, () -> waitDeltaFilesFuture.get(100, MILLISECONDS)); + + compactor.addDeltaFiles(1); + + waitDeltaFilesFuture.get(100, MILLISECONDS); + } + + @Test + void testCancel() throws Exception { + Compactor compactor = spy(new Compactor(log, "test", null, threadsConfig(1), mock(FilePageStoreManager.class), PAGE_SIZE)); + + assertFalse(compactor.isCancelled()); + + CompletableFuture waitDeltaFilesFuture = runAsync(compactor::waitDeltaFiles); + + assertThrows(TimeoutException.class, () -> waitDeltaFilesFuture.get(100, MILLISECONDS)); + + compactor.cancel(); + + assertTrue(compactor.isCancelled()); + assertFalse(Thread.currentThread().isInterrupted()); + + waitDeltaFilesFuture.get(100, MILLISECONDS); + } + + private static ConfigurationValue threadsConfig(int threads) { + ConfigurationValue configValue = mock(ConfigurationValue.class); + + when(configValue.value()).thenReturn(threads); + + return configValue; + } +} diff --git a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/AbstractFilePageStoreIoTest.java b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/AbstractFilePageStoreIoTest.java index b72d7783e1d6..cd0cd9bbc749 100644 --- a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/AbstractFilePageStoreIoTest.java +++ b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/AbstractFilePageStoreIoTest.java @@ -227,7 +227,7 @@ void testRead() throws Exception { long expPageId = createDataPageId(() -> 0); - ByteBuffer pageByteBuffer = createPageByteBuffer(0, PAGE_SIZE); + ByteBuffer pageByteBuffer = createPageByteBuffer(expPageId, PAGE_SIZE); // Puts random bytes after: type (2 byte) + version (2 byte) + crc (4 byte). pageByteBuffer.position(8).put(randomBytes(128)); diff --git a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/DeltaFilePageStoreIoTest.java b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/DeltaFilePageStoreIoTest.java index 27beb4c08c32..cebf1bd4a957 100644 --- a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/DeltaFilePageStoreIoTest.java +++ b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/DeltaFilePageStoreIoTest.java @@ -23,14 +23,21 @@ import static org.apache.ignite.internal.pagememory.PageIdAllocator.FLAG_DATA; import static org.apache.ignite.internal.pagememory.persistence.store.FilePageStore.DELTA_FILE_VERSION_1; import static org.apache.ignite.internal.pagememory.persistence.store.TestPageStoreUtils.arr; +import static org.apache.ignite.internal.pagememory.persistence.store.TestPageStoreUtils.createDataPageId; +import static org.apache.ignite.internal.pagememory.persistence.store.TestPageStoreUtils.createPageByteBuffer; +import static org.apache.ignite.internal.pagememory.persistence.store.TestPageStoreUtils.randomBytes; import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.startsWith; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import java.io.IOException; +import java.nio.ByteBuffer; import java.nio.file.Path; import java.util.stream.IntStream; import org.apache.ignite.internal.fileio.FileIo; @@ -50,11 +57,19 @@ void testPageOffset() throws Exception { try (DeltaFilePageStoreIo filePageStoreIo = createFilePageStoreIo(testFilePath, header)) { assertEquals(PAGE_SIZE, filePageStoreIo.pageOffset(pageId(0, FLAG_DATA, 0))); + assertEquals(PAGE_SIZE, filePageStoreIo.pageOffset(0)); + assertEquals(2 * PAGE_SIZE, filePageStoreIo.pageOffset(pageId(0, FLAG_DATA, 1))); + assertEquals(2 * PAGE_SIZE, filePageStoreIo.pageOffset(1)); + assertEquals(3 * PAGE_SIZE, filePageStoreIo.pageOffset(pageId(0, FLAG_DATA, 2))); + assertEquals(3 * PAGE_SIZE, filePageStoreIo.pageOffset(2)); assertEquals(-1, filePageStoreIo.pageOffset(pageId(0, FLAG_DATA, 3))); + assertEquals(-1, filePageStoreIo.pageOffset(3)); + assertEquals(-1, filePageStoreIo.pageOffset(pageId(0, FLAG_DATA, 4))); + assertEquals(-1, filePageStoreIo.pageOffset(4)); } } @@ -134,6 +149,51 @@ void testCheckHeader() throws Exception { } } + @Test + void testMergedToFilePageStore() throws Exception { + DeltaFilePageStoreIoHeader header = new DeltaFilePageStoreIoHeader(DELTA_FILE_VERSION_1, 1, PAGE_SIZE, arr(0, 1, 2)); + + try (DeltaFilePageStoreIo filePageStoreIo = createFilePageStoreIo(workDir.resolve("test"), header)) { + // Preparation for reading. + long pageId = createDataPageId(() -> 0); + + ByteBuffer pageByteBuffer = createPageByteBuffer(pageId, PAGE_SIZE); + + // Puts random bytes after: type (2 byte) + version (2 byte) + crc (4 byte). + pageByteBuffer.position(8).put(randomBytes(128)); + + filePageStoreIo.write(pageId, pageByteBuffer.rewind(), true); + + filePageStoreIo.sync(); + + // Checking readings. + ByteBuffer buffer = ByteBuffer.allocateDirect(PAGE_SIZE).order(pageByteBuffer.order()); + + long pageOff = filePageStoreIo.pageOffset(pageId); + + assertTrue(filePageStoreIo.readWithMergedToFilePageStoreCheck(pageId, pageOff, buffer, false)); + + assertEquals(pageByteBuffer.rewind(), buffer.rewind()); + + buffer.rewind().put(new byte[PAGE_SIZE]); + + filePageStoreIo.markMergedToFilePageStore(); + + assertFalse(filePageStoreIo.readWithMergedToFilePageStoreCheck(pageId, pageOff, buffer.rewind(), false)); + + assertEquals(ByteBuffer.allocateDirect(PAGE_SIZE).order(pageByteBuffer.order()), buffer.rewind()); + } + } + + @Test + void testPageIndexes() throws Exception { + DeltaFilePageStoreIoHeader header = new DeltaFilePageStoreIoHeader(DELTA_FILE_VERSION_1, 1, PAGE_SIZE, arr(0, 1, 2)); + + try (DeltaFilePageStoreIo filePageStoreIo = createFilePageStoreIo(workDir.resolve("test"), header)) { + assertArrayEquals(arr(0, 1, 2), filePageStoreIo.pageIndexes()); + } + } + /** {@inheritDoc} */ @Override DeltaFilePageStoreIo createFilePageStoreIo(Path filePath) { diff --git a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreManagerTest.java b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreManagerTest.java index 959db398313f..52588245cf7a 100644 --- a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreManagerTest.java +++ b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreManagerTest.java @@ -45,6 +45,7 @@ import java.nio.file.Path; import java.util.Collection; import java.util.HashSet; +import java.util.List; import java.util.Set; import java.util.stream.Stream; import org.apache.ignite.internal.fileio.RandomAccessFileIoFactory; @@ -351,6 +352,24 @@ void testDeltaFilePageStorePath() throws Exception { ); } + @Test + void testAllPageStores() throws Exception { + FilePageStoreManager manager = createManager(); + + manager.start(); + + manager.initialize("test0", 1, 1); + manager.initialize("test1", 2, 1); + + assertThat( + manager.allPageStores().stream().flatMap(List::stream).map(FilePageStore::filePath).collect(toList()), + containsInAnyOrder( + workDir.resolve("db/table-1").resolve("part-0.bin"), + workDir.resolve("db/table-2").resolve("part-0.bin") + ) + ); + } + private FilePageStoreManager createManager() throws Exception { return new FilePageStoreManager(log, "test", workDir, new RandomAccessFileIoFactory(), 1024); } diff --git a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreTest.java b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreTest.java index 19e6b20aedb6..573522291e4e 100644 --- a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreTest.java +++ b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreTest.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.pagememory.persistence.store; +import static java.nio.ByteOrder.nativeOrder; import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.ignite.internal.pagememory.persistence.store.FilePageStore.DELTA_FILE_VERSION_1; import static org.apache.ignite.internal.pagememory.persistence.store.FilePageStore.VERSION_1; @@ -29,8 +30,10 @@ import static org.hamcrest.Matchers.contains; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.eq; @@ -38,6 +41,7 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import java.nio.ByteBuffer; import java.nio.file.Path; @@ -411,6 +415,94 @@ void testGetNewDeltaFile() throws Exception { } } + @Test + void testRemoveDeltaFile() throws Exception { + DeltaFilePageStoreIo deltaFile0 = mock(DeltaFilePageStoreIo.class); + DeltaFilePageStoreIo deltaFile1 = mock(DeltaFilePageStoreIo.class); + + try (FilePageStore filePageStore = createFilePageStore(workDir.resolve("test"), deltaFile0, deltaFile1)) { + assertEquals(2, filePageStore.deltaFileCount()); + + assertTrue(filePageStore.removeDeltaFile(deltaFile0)); + assertFalse(filePageStore.removeDeltaFile(deltaFile0)); + + assertEquals(1, filePageStore.deltaFileCount()); + + assertTrue(filePageStore.removeDeltaFile(deltaFile1)); + assertFalse(filePageStore.removeDeltaFile(deltaFile1)); + + assertEquals(0, filePageStore.deltaFileCount()); + } + } + + @Test + void testGetDeltaFileToCompaction() throws Exception { + DeltaFilePageStoreIo deltaFile0 = mock(DeltaFilePageStoreIo.class); + DeltaFilePageStoreIo deltaFile1 = mock(DeltaFilePageStoreIo.class); + + when(deltaFile0.fileIndex()).thenReturn(0); + when(deltaFile0.fileIndex()).thenReturn(1); + + try (FilePageStore filePageStore = createFilePageStore(workDir.resolve("test"), deltaFile0, deltaFile1)) { + assertSame(deltaFile1, filePageStore.getDeltaFileToCompaction()); + + CompletableFuture createNewDeltaFileFuture = filePageStore.getOrCreateNewDeltaFile( + index -> workDir.resolve("delta" + index), + TestPageStoreUtils::arr + ); + + createNewDeltaFileFuture.get(1, SECONDS); + + assertSame(deltaFile1, filePageStore.getDeltaFileToCompaction()); + + filePageStore.removeDeltaFile(deltaFile1); + + assertSame(deltaFile0, filePageStore.getDeltaFileToCompaction()); + + filePageStore.removeDeltaFile(deltaFile0); + + assertNull(filePageStore.getDeltaFileToCompaction()); + + filePageStore.completeNewDeltaFile(); + + assertSame(createNewDeltaFileFuture.join(), filePageStore.getDeltaFileToCompaction()); + + filePageStore.removeDeltaFile(createNewDeltaFileFuture.join()); + + assertNull(filePageStore.getDeltaFileToCompaction()); + } + } + + @Test + void testReadWithMergedDeltaFiles() throws Exception { + RandomAccessFileIoFactory ioFactory = new RandomAccessFileIoFactory(); + + FilePageStoreHeader header = new FilePageStoreHeader(VERSION_1, PAGE_SIZE); + DeltaFilePageStoreIoHeader deltaHeader = new DeltaFilePageStoreIoHeader(DELTA_FILE_VERSION_1, 0, PAGE_SIZE, arr(0)); + + try ( + DeltaFilePageStoreIo deltaIo = spy(new DeltaFilePageStoreIo(ioFactory, deltaFilePath(0), deltaHeader)); + FilePageStoreIo storeIo = spy(new FilePageStoreIo(ioFactory, workDir.resolve("test"), header)); + FilePageStore filePageStore = new FilePageStore(storeIo, deltaIo); + ) { + long pageId = createDataPageId(filePageStore::allocatePage); + + ByteBuffer buffer = ByteBuffer.allocateDirect(PAGE_SIZE).order(nativeOrder()); + + filePageStore.read(pageId, buffer, true); + + verify(deltaIo, times(1)).readWithMergedToFilePageStoreCheck(eq(pageId), anyLong(), eq(buffer), eq(true)); + verify(storeIo, times(0)).read(eq(pageId), anyLong(), eq(buffer), eq(true)); + + deltaIo.markMergedToFilePageStore(); + + filePageStore.read(pageId, buffer.rewind(), true); + + verify(deltaIo, times(2)).readWithMergedToFilePageStoreCheck(eq(pageId), anyLong(), eq(buffer), eq(true)); + verify(storeIo, times(1)).read(eq(pageId), anyLong(), eq(buffer), eq(true)); + } + } + private static FilePageStore createFilePageStore(Path filePath) { return createFilePageStore(filePath, new FilePageStoreHeader(VERSION_1, PAGE_SIZE)); } diff --git a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/PageReadWriteManagerImplTest.java b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/PageReadWriteManagerImplTest.java deleted file mode 100644 index 723669d97d59..000000000000 --- a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/PageReadWriteManagerImplTest.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.pagememory.persistence.store; - -import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertSame; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import java.nio.ByteBuffer; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -/** - * For {@link PageReadWriteManagerImpl} testing. - */ -public class PageReadWriteManagerImplTest { - private FilePageStore filePageStore; - - private FilePageStoreManager filePageStoreManager; - - private PageReadWriteManagerImpl pageReadWriteManager; - - @BeforeEach - void setUp() throws Exception { - filePageStore = mock(FilePageStore.class); - - filePageStoreManager = mock(FilePageStoreManager.class); - - when(filePageStoreManager.getStore(0, 0)).thenReturn(filePageStore); - - pageReadWriteManager = new PageReadWriteManagerImpl(filePageStoreManager); - } - - @Test - void testRead() throws Exception { - long pageId = pageId(0, (byte) 0, 0); - - ByteBuffer pageBuffer = mock(ByteBuffer.class); - - pageReadWriteManager.read(0, pageId, pageBuffer, true); - - verify(filePageStore, times(1)).read(pageId, pageBuffer, true); - } - - @Test - void testWrite() throws Exception { - long pageId = pageId(0, (byte) 0, 0); - - ByteBuffer pageBuffer = mock(ByteBuffer.class); - - assertSame(filePageStore, pageReadWriteManager.write(0, pageId, pageBuffer, true)); - - verify(filePageStore, times(1)).write(pageId, pageBuffer, true); - } - - @Test - void testAllocatePage() throws Exception { - assertEquals( - pageId(0, (byte) 1, 0), - pageReadWriteManager.allocatePage(0, 0, (byte) 1) - ); - - verify(filePageStore, times(1)).allocatePage(); - } -} diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java index 5299301230cc..593ce5c59bd9 100644 --- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java +++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java @@ -72,6 +72,12 @@ public void start() throws StorageException { try { dataRegion.filePageStoreManager().initialize(tableView.name(), tableView.tableId(), tableView.partitions()); + + int deltaFileCount = dataRegion.filePageStoreManager().getStores(tableView.tableId()).stream() + .mapToInt(FilePageStore::deltaFileCount) + .sum(); + + dataRegion.checkpointManager().addDeltaFileCountForCompaction(deltaFileCount); } catch (IgniteInternalCheckedException e) { throw new StorageException("Error initializing file page stores for table: " + tableView.name(), e); }