From 5d4c166ee982cf23553f150b2ae60c63ab26b81e Mon Sep 17 00:00:00 2001 From: Kirill Tkalenko Date: Tue, 2 Aug 2022 14:30:35 +0300 Subject: [PATCH 01/10] IGNITE-16657 wip --- .../persistence/compaction/Compactor.java | 277 ++++++++++++++++++ .../persistence/store/FilePageStore.java | 41 +++ .../store/FilePageStoreManager.java | 7 + .../persistence/compaction/CompactorTest.java | 24 ++ .../store/FilePageStoreManagerTest.java | 19 ++ .../persistence/store/FilePageStoreTest.java | 61 ++++ 6 files changed, 429 insertions(+) create mode 100644 modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java create mode 100644 modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/compaction/CompactorTest.java diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java new file mode 100644 index 000000000000..7bc091dde625 --- /dev/null +++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.pagememory.persistence.compaction; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.stream.Collectors.toCollection; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.List; +import java.util.Objects; +import java.util.Queue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ignite.configuration.ConfigurationValue; +import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.pagememory.persistence.store.DeltaFilePageStoreIo; +import org.apache.ignite.internal.pagememory.persistence.store.FilePageStore; +import org.apache.ignite.internal.pagememory.persistence.store.FilePageStoreManager; +import org.apache.ignite.internal.thread.IgniteThread; +import org.apache.ignite.internal.thread.NamedThreadFactory; +import org.apache.ignite.internal.util.IgniteUtils; +import org.apache.ignite.internal.util.worker.IgniteWorker; +import org.apache.ignite.internal.util.worker.IgniteWorkerListener; +import org.apache.ignite.lang.IgniteBiTuple; +import org.apache.ignite.lang.IgniteInternalException; +import org.jetbrains.annotations.Nullable; + +/** + * Empty. + */ +// TODO: IGNITE-16657 добавить описание +public class Compactor extends IgniteWorker { + private final Object mux = new Object(); + + private final @Nullable ThreadPoolExecutor threadPoolExecutor; + + private final AtomicInteger deltaFileCount = new AtomicInteger(); + + private final FilePageStoreManager filePageStoreManager; + + /** Thread local with buffers for the compaction threads. */ + private final ThreadLocal threadBuf; + + /** + * Creates new ignite worker with given parameters. + * + * @param log Logger. + * @param igniteInstanceName Name of the Ignite instance this runnable is used in. + * @param listener Listener for life-cycle events. + * @param threads Number of compaction threads. + * @param filePageStoreManager File page store manager. + * @param pageSize Page size in bytes. + */ + public Compactor( + IgniteLogger log, + String igniteInstanceName, + @Nullable IgniteWorkerListener listener, + ConfigurationValue threads, + FilePageStoreManager filePageStoreManager, + int pageSize + ) { + super(log, igniteInstanceName, "compaction-thread", listener); + + this.filePageStoreManager = filePageStoreManager; + + threadBuf = ThreadLocal.withInitial(() -> { + ByteBuffer tmpWriteBuf = ByteBuffer.allocateDirect(pageSize); + + tmpWriteBuf.order(ByteOrder.nativeOrder()); + + return tmpWriteBuf; + }); + + int threadCount = threads.value(); + + if (threadCount > 1) { + threadPoolExecutor = new ThreadPoolExecutor( + threadCount, + threadCount, + 30_000, + MILLISECONDS, + new LinkedBlockingQueue<>(), + new NamedThreadFactory("compaction-runner-io", log) + ); + } else { + threadPoolExecutor = null; + } + } + + /** {@inheritDoc} */ + @Override + protected void body() throws InterruptedException { + try { + while (!isCancelled()) { + waitDeltaFiles(); + + if (isCancelled()) { + log.info("Skipping the delta file compaction because the node is stopping"); + + return; + } + + doCompaction(); + } + } catch (Throwable t) { + // TODO: IGNITE-16899 By analogy with 2.0, we need to handle the exception (err) by the FailureProcessor + + throw new IgniteInternalException(t); + } + } + + /** + * Waiting for delta files. + */ + void waitDeltaFiles() { + try { + synchronized (mux) { + while (deltaFileCount.get() == 0 && !isCancelled()) { + mux.wait(); + } + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + + isCancelled.set(true); + } + } + + /** + * Merges delta files with partition files. + */ + void doCompaction() { + // Let's collect one delta file for each partition. + Queue> queue = filePageStoreManager.allPageStores().stream() + .flatMap(List::stream) + .map(filePageStore -> { + DeltaFilePageStoreIo deltaFileToCompaction = filePageStore.getDeltaFileToCompaction(); + + return deltaFileToCompaction == null ? null : new IgniteBiTuple<>(filePageStore, deltaFileToCompaction); + }) + .filter(Objects::nonNull) + .collect(toCollection(ConcurrentLinkedQueue::new)); + + assert !queue.isEmpty(); + + int threads = threadPoolExecutor == null ? 1 : threadPoolExecutor.getMaximumPoolSize(); + + CompletableFuture[] futures = new CompletableFuture[threads]; + + for (int i = 0; i < threads; i++) { + CompletableFuture future = futures[i] = new CompletableFuture<>(); + + Runnable merger = () -> { + IgniteBiTuple toMerge; + + while ((toMerge = queue.poll()) != null) { + mergeDeltaFileToMainFile(toMerge.get1(), toMerge.get2(), future); + } + }; + + if (threadPoolExecutor == null) { + merger.run(); + } else { + try { + threadPoolExecutor.execute(merger); + } catch (RejectedExecutionException ignore) { + // Run the task synchronously. + merger.run(); + } + } + } + + // Wait and check for errors. + CompletableFuture.allOf(futures).join(); + } + + /** + * Starts the compacter. + */ + public void start() { + if (runner() != null) { + return; + } + + assert runner() == null : "Compacter is running"; + + new IgniteThread(this).start(); + } + + /** + * Stops the compacter. + */ + public void stop() throws Exception { + cancel(); + + boolean interrupt = false; + + while (true) { + try { + join(); + + break; + } catch (InterruptedException e) { + interrupt = true; + } + } + + if (interrupt) { + Thread.currentThread().interrupt(); + } + + if (threadPoolExecutor != null) { + IgniteUtils.shutdownAndAwaitTermination(threadPoolExecutor, 2, TimeUnit.MINUTES); + } + } + + /** {@inheritDoc} */ + @Override + public void cancel() { + if (log.isDebugEnabled()) { + log.debug("Cancelling grid runnable: " + this); + } + + // Do not interrupt runner thread. + isCancelled.set(true); + + synchronized (mux) { + mux.notifyAll(); + } + } + + /** + * Merges the main file page store with the delta file page store. + * + *

Steps: + *

    + *
  • Copy pages from delta file page store to file page store.
  • + *
  • Fsync the file page store.
  • + *
  • Removing the delta file page store from a file page store.
  • + *
+ * + * @param filePageStore File page store. + * @param deltaFilePageStore Delta file page store. + * @param future Future that should complete when the merge is complete. + */ + void mergeDeltaFileToMainFile(FilePageStore filePageStore, DeltaFilePageStoreIo deltaFilePageStore, CompletableFuture future) { + try { + // Copy pages deltaFilePageStore -> filePageStore. + ByteBuffer buffer = threadBuf.get(); + + // TODO: IGNITE-16657 вот тут надо писать основную логику + } catch (Throwable e) { + future.completeExceptionally(e); + } + } +} diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStore.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStore.java index 9ff231861120..384ef4152ba2 100644 --- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStore.java +++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStore.java @@ -26,6 +26,7 @@ import java.nio.file.Path; import java.util.Arrays; import java.util.Comparator; +import java.util.Iterator; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CopyOnWriteArrayList; @@ -344,4 +345,44 @@ public void completeNewDeltaFile() { public int deltaFileCount() { return deltaFilePageStoreIos.size(); } + + /** + * Returns the delta file to compaction (oldest). + * + *

Thread safe. + */ + public @Nullable DeltaFilePageStoreIo getDeltaFileToCompaction() { + // Snapshot of delta files. + Iterator iterator = deltaFilePageStoreIos.iterator(); + + // Last one is the oldest. + DeltaFilePageStoreIo last = null; + + int count = 0; + + while (iterator.hasNext()) { + last = iterator.next(); + + count++; + } + + // If last is just created, then it cannot be compacted yet. + if (count == 1 && newDeltaFilePageStoreIoFuture != null) { + last = null; + } + + return last; + } + + /** + * Deletes delta file. + * + *

Thread safe. + * + * @param deltaFilePageStoreIo Delta file to be deleted. + * @return {@code True} if the delta file being removed was present. + */ + public boolean removeDeltaFile(DeltaFilePageStoreIo deltaFilePageStoreIo) { + return deltaFilePageStoreIos.remove(deltaFilePageStoreIo); + } } diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreManager.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreManager.java index 64d67fd2829b..d2a28abac6d8 100644 --- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreManager.java +++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreManager.java @@ -239,6 +239,13 @@ public void initialize(String tableName, int tableId, int partitions) throws Ign return groupPageStores.get(grpId); } + /** + * Returns all page stores of all groups. + */ + public Collection> allPageStores() { + return groupPageStores.allPageStores(); + } + /** * Returns partition file page store for the corresponding parameters. * diff --git a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/compaction/CompactorTest.java b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/compaction/CompactorTest.java new file mode 100644 index 000000000000..e5803fa8581a --- /dev/null +++ b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/compaction/CompactorTest.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.pagememory.persistence.compaction; + +/** + * For {@link Compactor} testing. + */ +public class CompactorTest { +} diff --git a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreManagerTest.java b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreManagerTest.java index 959db398313f..52588245cf7a 100644 --- a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreManagerTest.java +++ b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreManagerTest.java @@ -45,6 +45,7 @@ import java.nio.file.Path; import java.util.Collection; import java.util.HashSet; +import java.util.List; import java.util.Set; import java.util.stream.Stream; import org.apache.ignite.internal.fileio.RandomAccessFileIoFactory; @@ -351,6 +352,24 @@ void testDeltaFilePageStorePath() throws Exception { ); } + @Test + void testAllPageStores() throws Exception { + FilePageStoreManager manager = createManager(); + + manager.start(); + + manager.initialize("test0", 1, 1); + manager.initialize("test1", 2, 1); + + assertThat( + manager.allPageStores().stream().flatMap(List::stream).map(FilePageStore::filePath).collect(toList()), + containsInAnyOrder( + workDir.resolve("db/table-1").resolve("part-0.bin"), + workDir.resolve("db/table-2").resolve("part-0.bin") + ) + ); + } + private FilePageStoreManager createManager() throws Exception { return new FilePageStoreManager(log, "test", workDir, new RandomAccessFileIoFactory(), 1024); } diff --git a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreTest.java b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreTest.java index 19e6b20aedb6..56b0bca76413 100644 --- a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreTest.java +++ b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreTest.java @@ -29,8 +29,10 @@ import static org.hamcrest.Matchers.contains; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.eq; @@ -38,6 +40,7 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import java.nio.ByteBuffer; import java.nio.file.Path; @@ -411,6 +414,64 @@ void testGetNewDeltaFile() throws Exception { } } + @Test + void testRemoveDeltaFile() throws Exception { + DeltaFilePageStoreIo deltaFile0 = mock(DeltaFilePageStoreIo.class); + DeltaFilePageStoreIo deltaFile1 = mock(DeltaFilePageStoreIo.class); + + try (FilePageStore filePageStore = createFilePageStore(workDir.resolve("test"), deltaFile0, deltaFile1)) { + assertEquals(2, filePageStore.deltaFileCount()); + + assertTrue(filePageStore.removeDeltaFile(deltaFile0)); + assertFalse(filePageStore.removeDeltaFile(deltaFile0)); + + assertEquals(1, filePageStore.deltaFileCount()); + + assertTrue(filePageStore.removeDeltaFile(deltaFile1)); + assertFalse(filePageStore.removeDeltaFile(deltaFile1)); + + assertEquals(0, filePageStore.deltaFileCount()); + } + } + + @Test + void testGetDeltaFileToCompaction() throws Exception { + DeltaFilePageStoreIo deltaFile0 = mock(DeltaFilePageStoreIo.class); + DeltaFilePageStoreIo deltaFile1 = mock(DeltaFilePageStoreIo.class); + + when(deltaFile0.fileIndex()).thenReturn(0); + when(deltaFile0.fileIndex()).thenReturn(1); + + try (FilePageStore filePageStore = createFilePageStore(workDir.resolve("test"), deltaFile0, deltaFile1)) { + assertSame(deltaFile1, filePageStore.getDeltaFileToCompaction()); + + CompletableFuture createNewDeltaFileFuture = filePageStore.getOrCreateNewDeltaFile( + index -> workDir.resolve("delta" + index), + TestPageStoreUtils::arr + ); + + createNewDeltaFileFuture.get(1, SECONDS); + + assertSame(deltaFile1, filePageStore.getDeltaFileToCompaction()); + + filePageStore.removeDeltaFile(deltaFile1); + + assertSame(deltaFile0, filePageStore.getDeltaFileToCompaction()); + + filePageStore.removeDeltaFile(deltaFile0); + + assertNull(filePageStore.getDeltaFileToCompaction()); + + filePageStore.completeNewDeltaFile(); + + assertSame(createNewDeltaFileFuture.join(), filePageStore.getDeltaFileToCompaction()); + + filePageStore.removeDeltaFile(createNewDeltaFileFuture.join()); + + assertNull(filePageStore.getDeltaFileToCompaction()); + } + } + private static FilePageStore createFilePageStore(Path filePath) { return createFilePageStore(filePath, new FilePageStoreHeader(VERSION_1, PAGE_SIZE)); } From a6feb9350e130888fa7a4f7d772d3feb5521e81e Mon Sep 17 00:00:00 2001 From: Kirill Tkalenko Date: Tue, 2 Aug 2022 17:18:46 +0300 Subject: [PATCH 02/10] IGNITE-16657 wip --- .../checkpoint/CheckpointPagesWriter.java | 1 + .../persistence/compaction/Compactor.java | 62 ++++++++++++++++++- .../store/AbstractFilePageStoreIo.java | 8 ++- .../store/DeltaFilePageStoreIo.java | 14 ++++- .../persistence/store/FilePageStore.java | 12 +--- .../store/PageReadWriteManagerImpl.java | 1 + 6 files changed, 81 insertions(+), 17 deletions(-) diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java index 6da756a10324..140cbb4b9f91 100644 --- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java +++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java @@ -76,6 +76,7 @@ public class CheckpointPagesWriter implements Runnable { private final CompletableFuture doneFut; /** Some action which will be executed every time before page will be written. */ + // TODO: IGNITE-16657 переименовать в обновление хертбитов private final Runnable beforePageWrite; /** Thread local with buffers for the checkpoint threads. Each buffer represent one page for durable memory. */ diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java index 7bc091dde625..a152a8f04230 100644 --- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java +++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java @@ -34,6 +34,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.ignite.configuration.ConfigurationValue; import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.pagememory.io.PageIo; import org.apache.ignite.internal.pagememory.persistence.store.DeltaFilePageStoreIo; import org.apache.ignite.internal.pagememory.persistence.store.FilePageStore; import org.apache.ignite.internal.pagememory.persistence.store.FilePageStoreManager; @@ -137,7 +138,13 @@ void waitDeltaFiles() { try { synchronized (mux) { while (deltaFileCount.get() == 0 && !isCancelled()) { - mux.wait(); + blockingSectionBegin(); + + try { + mux.wait(); + } finally { + blockingSectionEnd(); + } } } } catch (InterruptedException e) { @@ -164,6 +171,8 @@ void doCompaction() { assert !queue.isEmpty(); + updateHeartbeat(); + int threads = threadPoolExecutor == null ? 1 : threadPoolExecutor.getMaximumPoolSize(); CompletableFuture[] futures = new CompletableFuture[threads]; @@ -174,11 +183,15 @@ void doCompaction() { Runnable merger = () -> { IgniteBiTuple toMerge; - while ((toMerge = queue.poll()) != null) { + while ((toMerge = queue.poll()) != null && !isCancelled()) { mergeDeltaFileToMainFile(toMerge.get1(), toMerge.get2(), future); } }; + if (isCancelled()) { + return; + } + if (threadPoolExecutor == null) { merger.run(); } else { @@ -191,6 +204,8 @@ void doCompaction() { } } + updateHeartbeat(); + // Wait and check for errors. CompletableFuture.allOf(futures).join(); } @@ -269,6 +284,49 @@ void mergeDeltaFileToMainFile(FilePageStore filePageStore, DeltaFilePageStoreIo // Copy pages deltaFilePageStore -> filePageStore. ByteBuffer buffer = threadBuf.get(); + for (long pageOffset : deltaFilePageStore.pageOffsets()) { + updateHeartbeat(); + + if (isCancelled()) { + return; + } + + // -1 because we don't know the pageId yet. + deltaFilePageStore.read(-1, pageOffset, buffer.rewind(), true); + + long pageId = PageIo.getPageId(buffer.rewind()); + + assert pageId != 0; + + updateHeartbeat(); + + if (isCancelled()) { + return; + } + + filePageStore.write(pageId, buffer.rewind(), true); + } + + // Fsync the file page store. + updateHeartbeat(); + + if (isCancelled()) { + return; + } + + filePageStore.sync(); + + // Removing the delta file page store from a file page store. + updateHeartbeat(); + + if (isCancelled()) { + return; + } + + boolean removed = filePageStore.removeDeltaFile(deltaFilePageStore); + + assert removed : filePageStore.filePath(); + // TODO: IGNITE-16657 вот тут надо писать основную логику } catch (Throwable e) { future.completeExceptionally(e); diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/AbstractFilePageStoreIo.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/AbstractFilePageStoreIo.java index 9c13199387e8..dd2082eda62f 100644 --- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/AbstractFilePageStoreIo.java +++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/AbstractFilePageStoreIo.java @@ -64,7 +64,11 @@ public abstract class AbstractFilePageStoreIo implements Closeable { /** Initialized file page store IO. */ private volatile boolean initialized; - /** Caches the existence state of file. After it is initialized, it will be not {@code null} during lifecycle. */ + /** + * Caches the existence state of file. After it is initialized, it will be not {@code null} during lifecycle. + * + *

Guarded by {@link #readWriteLock}. + */ private @Nullable Boolean fileExists; /** @@ -513,8 +517,6 @@ private void read0( if (keepCrc) { PageIo.setCrc(pageBuf, savedCrc32); } - - return; } catch (IOException e) { throw new IgniteInternalCheckedException("Failed to read page [file=" + filePath + ", pageId=" + pageId + "]", e); } diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/DeltaFilePageStoreIo.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/DeltaFilePageStoreIo.java index 2fd974a82b1c..59cac13510f9 100644 --- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/DeltaFilePageStoreIo.java +++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/DeltaFilePageStoreIo.java @@ -29,6 +29,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.file.Path; +import java.util.stream.IntStream; import org.apache.ignite.internal.fileio.FileIo; import org.apache.ignite.internal.fileio.FileIoFactory; @@ -101,7 +102,7 @@ public long pageOffset(long pageId) { return -1; } - return (long) searchResult * pageSize() + headerSize(); + return pageOffset(searchResult); } /** @@ -110,4 +111,15 @@ public long pageOffset(long pageId) { public int fileIndex() { return header.index(); } + + /** + * Returns page offsets within the store file. + */ + public long[] pageOffsets() { + return IntStream.of(header.pageIndexes()).mapToLong(this::pageOffset).toArray(); + } + + private long pageOffset(int pagePosition) { + return (long) pagePosition * pageSize() + headerSize(); + } } diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStore.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStore.java index 384ef4152ba2..2328671c078f 100644 --- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStore.java +++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStore.java @@ -183,17 +183,7 @@ public void readWithoutPageIdCheck(long pageId, ByteBuffer pageBuf, boolean keep public void read(long pageId, ByteBuffer pageBuf, boolean keepCrc) throws IgniteInternalCheckedException { assert pageIndex(pageId) <= pageCount : "pageIdx=" + pageIndex(pageId) + ", pageCount=" + pageCount; - for (DeltaFilePageStoreIo deltaFilePageStoreIo : deltaFilePageStoreIos) { - long pageOff = deltaFilePageStoreIo.pageOffset(pageId); - - if (pageOff >= 0) { - deltaFilePageStoreIo.read(pageId, pageOff, pageBuf, keepCrc); - - return; - } - } - - filePageStoreIo.read(pageId, filePageStoreIo.pageOffset(pageId), pageBuf, keepCrc); + readWithoutPageIdCheck(pageId, pageBuf, keepCrc); } /** {@inheritDoc} */ diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/PageReadWriteManagerImpl.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/PageReadWriteManagerImpl.java index 94e47164f66f..d66fa993ba4e 100644 --- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/PageReadWriteManagerImpl.java +++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/PageReadWriteManagerImpl.java @@ -28,6 +28,7 @@ /** * {@link org.apache.ignite.internal.pagememory.persistence.PageReadWriteManager} implementation. */ +// TODO: IGNITE-16657 рассмотреть его удаление class PageReadWriteManagerImpl implements org.apache.ignite.internal.pagememory.persistence.PageReadWriteManager { @IgniteToStringExclude protected final FilePageStoreManager filePageStoreManager; From 93075af7baa14b789241d5e12f0714dcf5fd2796 Mon Sep 17 00:00:00 2001 From: Kirill Tkalenko Date: Wed, 3 Aug 2022 11:35:09 +0300 Subject: [PATCH 03/10] IGNITE-16657 wip --- .../persistence/compaction/Compactor.java | 9 +-- .../store/AbstractFilePageStoreIo.java | 2 +- .../store/DeltaFilePageStoreIo.java | 48 +++++++++++++++- .../persistence/store/FilePageStore.java | 6 +- .../persistence/store/FilePageStoreIo.java | 7 +++ .../persistence/store/PageStore.java | 3 +- .../store/AbstractFilePageStoreIoTest.java | 2 +- .../store/DeltaFilePageStoreIoTest.java | 55 +++++++++++++++++++ .../persistence/store/FilePageStoreTest.java | 31 +++++++++++ 9 files changed, 150 insertions(+), 13 deletions(-) diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java index a152a8f04230..5a6e08b9db44 100644 --- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java +++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java @@ -291,12 +291,11 @@ void mergeDeltaFileToMainFile(FilePageStore filePageStore, DeltaFilePageStoreIo return; } - // -1 because we don't know the pageId yet. - deltaFilePageStore.read(-1, pageOffset, buffer.rewind(), true); + deltaFilePageStore.readWithMergedToFilePageStoreCheck(pageOffset, pageOffset, buffer.rewind(), true); long pageId = PageIo.getPageId(buffer.rewind()); - assert pageId != 0; + assert pageId != 0 : deltaFilePageStore.filePath(); updateHeartbeat(); @@ -327,7 +326,9 @@ void mergeDeltaFileToMainFile(FilePageStore filePageStore, DeltaFilePageStoreIo assert removed : filePageStore.filePath(); - // TODO: IGNITE-16657 вот тут надо писать основную логику + deltaFilePageStore.markMergedToFilePageStore(); + + deltaFilePageStore.stop(true); } catch (Throwable e) { future.completeExceptionally(e); } diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/AbstractFilePageStoreIo.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/AbstractFilePageStoreIo.java index dd2082eda62f..7043f90abf3b 100644 --- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/AbstractFilePageStoreIo.java +++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/AbstractFilePageStoreIo.java @@ -144,7 +144,7 @@ public void close() throws IOException { * @param keepCrc By default, reading zeroes CRC which was on page store, but you can keep it in {@code pageBuf} if set {@code true}. * @throws IgniteInternalCheckedException If reading failed (IO error occurred). */ - public void read(long pageId, long pageOff, ByteBuffer pageBuf, boolean keepCrc) throws IgniteInternalCheckedException { + protected void read(long pageId, long pageOff, ByteBuffer pageBuf, boolean keepCrc) throws IgniteInternalCheckedException { read0(pageId, pageOff, pageBuf, !skipCrc, keepCrc); } diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/DeltaFilePageStoreIo.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/DeltaFilePageStoreIo.java index 59cac13510f9..eb134255a4d7 100644 --- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/DeltaFilePageStoreIo.java +++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/DeltaFilePageStoreIo.java @@ -32,6 +32,8 @@ import java.util.stream.IntStream; import org.apache.ignite.internal.fileio.FileIo; import org.apache.ignite.internal.fileio.FileIoFactory; +import org.apache.ignite.internal.util.IgniteSpinBusyLock; +import org.apache.ignite.lang.IgniteInternalCheckedException; /** * Implementation of the class for working with the delta file page storage IO. @@ -39,6 +41,9 @@ public class DeltaFilePageStoreIo extends AbstractFilePageStoreIo { private final DeltaFilePageStoreIoHeader header; + /** Lock to prevent reads after merging with a file page store. */ + private final IgniteSpinBusyLock mergedBusyLock = new IgniteSpinBusyLock(); + /** * Constructor. * @@ -105,6 +110,39 @@ public long pageOffset(long pageId) { return pageOffset(searchResult); } + private long pageOffset(int pagePosition) { + return (long) pagePosition * pageSize() + headerSize(); + } + + /** + * Reads a page. + * + * @param pageId Page ID. + * @param pageBuf Page buffer to read into. + * @param keepCrc By default, reading zeroes CRC which was on page store, but you can keep it in {@code pageBuf} if set {@code true}. + * @return {@code True} if the page was successfully read, otherwise the delta file page store is {@link #markMergedToFilePageStore() + * merged} with the file page store (must be read from file page store). + * @throws IgniteInternalCheckedException If reading failed (IO error occurred). + */ + public boolean readWithMergedToFilePageStoreCheck( + long pageId, + long pageOff, + ByteBuffer pageBuf, + boolean keepCrc + ) throws IgniteInternalCheckedException { + if (!mergedBusyLock.enterBusy()) { + return false; + } + + try { + super.read(pageId, pageOff, pageBuf, keepCrc); + + return true; + } finally { + mergedBusyLock.leaveBusy(); + } + } + /** * Returns the index of the delta file page store. */ @@ -119,7 +157,13 @@ public long[] pageOffsets() { return IntStream.of(header.pageIndexes()).mapToLong(this::pageOffset).toArray(); } - private long pageOffset(int pagePosition) { - return (long) pagePosition * pageSize() + headerSize(); + /** + * Marks that the delta file page store has been merged with the file page store. + * + *

It waits for all current {@link #readWithMergedToFilePageStoreCheck(long, long, ByteBuffer, boolean) readings} to end, and + * subsequent ones will return {@code false} (will need to read from the file page store not from delta file page store). + */ + public void markMergedToFilePageStore() { + mergedBusyLock.block(); } } diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStore.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStore.java index 2328671c078f..5a3b3a2e1825 100644 --- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStore.java +++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStore.java @@ -169,9 +169,9 @@ public void readWithoutPageIdCheck(long pageId, ByteBuffer pageBuf, boolean keep long pageOff = deltaFilePageStoreIo.pageOffset(pageId); if (pageOff >= 0) { - deltaFilePageStoreIo.read(pageId, pageOff, pageBuf, keepCrc); - - return; + if (deltaFilePageStoreIo.readWithMergedToFilePageStoreCheck(pageId, pageOff, pageBuf, keepCrc)) { + return; + } } } diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreIo.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreIo.java index 0d340c9e9b59..e14d397bf533 100644 --- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreIo.java +++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreIo.java @@ -28,6 +28,7 @@ import org.apache.ignite.internal.fileio.FileIo; import org.apache.ignite.internal.fileio.FileIoFactory; import org.apache.ignite.internal.pagememory.util.PageIdUtils; +import org.apache.ignite.lang.IgniteInternalCheckedException; /** * Implementation of the class for working with the file page file storage IO. @@ -83,6 +84,12 @@ public void checkHeader(FileIo fileIo) throws IOException { checkFilePageSize(this.header.pageSize(), header.pageSize()); } + /** {@inheritDoc} */ + @Override + public void read(long pageId, long pageOff, ByteBuffer pageBuf, boolean keepCrc) throws IgniteInternalCheckedException { + super.read(pageId, pageOff, pageBuf, keepCrc); + } + /** {@inheritDoc} */ @Override public long pageOffset(long pageId) { diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/PageStore.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/PageStore.java index 9866ec28ae67..66d778a6a3f2 100644 --- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/PageStore.java +++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/PageStore.java @@ -51,8 +51,7 @@ public interface PageStore extends Closeable { * * @param pageId Page ID. * @param pageBuf Page buffer to read into. - * @param keepCrc By default, reading zeroes CRC which was on page store, but you can keep it in {@code pageBuf} if set {@code - * keepCrc}. + * @param keepCrc By default, reading zeroes CRC which was on page store, but you can keep it in {@code pageBuf} if set {@code true}. * @throws IgniteInternalCheckedException If reading failed (IO error occurred). */ void read(long pageId, ByteBuffer pageBuf, boolean keepCrc) throws IgniteInternalCheckedException; diff --git a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/AbstractFilePageStoreIoTest.java b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/AbstractFilePageStoreIoTest.java index b72d7783e1d6..cd0cd9bbc749 100644 --- a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/AbstractFilePageStoreIoTest.java +++ b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/AbstractFilePageStoreIoTest.java @@ -227,7 +227,7 @@ void testRead() throws Exception { long expPageId = createDataPageId(() -> 0); - ByteBuffer pageByteBuffer = createPageByteBuffer(0, PAGE_SIZE); + ByteBuffer pageByteBuffer = createPageByteBuffer(expPageId, PAGE_SIZE); // Puts random bytes after: type (2 byte) + version (2 byte) + crc (4 byte). pageByteBuffer.position(8).put(randomBytes(128)); diff --git a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/DeltaFilePageStoreIoTest.java b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/DeltaFilePageStoreIoTest.java index 27beb4c08c32..7760f8b12241 100644 --- a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/DeltaFilePageStoreIoTest.java +++ b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/DeltaFilePageStoreIoTest.java @@ -23,14 +23,21 @@ import static org.apache.ignite.internal.pagememory.PageIdAllocator.FLAG_DATA; import static org.apache.ignite.internal.pagememory.persistence.store.FilePageStore.DELTA_FILE_VERSION_1; import static org.apache.ignite.internal.pagememory.persistence.store.TestPageStoreUtils.arr; +import static org.apache.ignite.internal.pagememory.persistence.store.TestPageStoreUtils.createDataPageId; +import static org.apache.ignite.internal.pagememory.persistence.store.TestPageStoreUtils.createPageByteBuffer; +import static org.apache.ignite.internal.pagememory.persistence.store.TestPageStoreUtils.randomBytes; import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.startsWith; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import java.io.IOException; +import java.nio.ByteBuffer; import java.nio.file.Path; import java.util.stream.IntStream; import org.apache.ignite.internal.fileio.FileIo; @@ -134,6 +141,54 @@ void testCheckHeader() throws Exception { } } + @Test + void testPageOffsets() throws Exception { + DeltaFilePageStoreIoHeader header = new DeltaFilePageStoreIoHeader(DELTA_FILE_VERSION_1, 1, PAGE_SIZE, arr(0, 1, 2)); + + try (DeltaFilePageStoreIo filePageStoreIo = createFilePageStoreIo(workDir.resolve("test"), header)) { + assertArrayEquals( + IntStream.of(header.pageIndexes()).mapToLong(index -> ((long) index * PAGE_SIZE) + header.headerSize()).toArray(), + filePageStoreIo.pageOffsets() + ); + } + } + + @Test + void testMergedToFilePageStore() throws Exception { + DeltaFilePageStoreIoHeader header = new DeltaFilePageStoreIoHeader(DELTA_FILE_VERSION_1, 1, PAGE_SIZE, arr(0, 1, 2)); + + try (DeltaFilePageStoreIo filePageStoreIo = createFilePageStoreIo(workDir.resolve("test"), header)) { + // Preparation for reading. + long pageId = createDataPageId(() -> 0); + + ByteBuffer pageByteBuffer = createPageByteBuffer(pageId, PAGE_SIZE); + + // Puts random bytes after: type (2 byte) + version (2 byte) + crc (4 byte). + pageByteBuffer.position(8).put(randomBytes(128)); + + filePageStoreIo.write(pageId, pageByteBuffer.rewind(), true); + + filePageStoreIo.sync(); + + // Checking readings. + ByteBuffer buffer = ByteBuffer.allocateDirect(PAGE_SIZE).order(pageByteBuffer.order()); + + long pageOff = filePageStoreIo.pageOffset(pageId); + + assertTrue(filePageStoreIo.readWithMergedToFilePageStoreCheck(pageId, pageOff, buffer, false)); + + assertEquals(pageByteBuffer.rewind(), buffer.rewind()); + + buffer.rewind().put(new byte[PAGE_SIZE]); + + filePageStoreIo.markMergedToFilePageStore(); + + assertFalse(filePageStoreIo.readWithMergedToFilePageStoreCheck(pageId, pageOff, buffer.rewind(), false)); + + assertEquals(ByteBuffer.allocateDirect(PAGE_SIZE).order(pageByteBuffer.order()), buffer.rewind()); + } + } + /** {@inheritDoc} */ @Override DeltaFilePageStoreIo createFilePageStoreIo(Path filePath) { diff --git a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreTest.java b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreTest.java index 56b0bca76413..573522291e4e 100644 --- a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreTest.java +++ b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreTest.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.pagememory.persistence.store; +import static java.nio.ByteOrder.nativeOrder; import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.ignite.internal.pagememory.persistence.store.FilePageStore.DELTA_FILE_VERSION_1; import static org.apache.ignite.internal.pagememory.persistence.store.FilePageStore.VERSION_1; @@ -472,6 +473,36 @@ void testGetDeltaFileToCompaction() throws Exception { } } + @Test + void testReadWithMergedDeltaFiles() throws Exception { + RandomAccessFileIoFactory ioFactory = new RandomAccessFileIoFactory(); + + FilePageStoreHeader header = new FilePageStoreHeader(VERSION_1, PAGE_SIZE); + DeltaFilePageStoreIoHeader deltaHeader = new DeltaFilePageStoreIoHeader(DELTA_FILE_VERSION_1, 0, PAGE_SIZE, arr(0)); + + try ( + DeltaFilePageStoreIo deltaIo = spy(new DeltaFilePageStoreIo(ioFactory, deltaFilePath(0), deltaHeader)); + FilePageStoreIo storeIo = spy(new FilePageStoreIo(ioFactory, workDir.resolve("test"), header)); + FilePageStore filePageStore = new FilePageStore(storeIo, deltaIo); + ) { + long pageId = createDataPageId(filePageStore::allocatePage); + + ByteBuffer buffer = ByteBuffer.allocateDirect(PAGE_SIZE).order(nativeOrder()); + + filePageStore.read(pageId, buffer, true); + + verify(deltaIo, times(1)).readWithMergedToFilePageStoreCheck(eq(pageId), anyLong(), eq(buffer), eq(true)); + verify(storeIo, times(0)).read(eq(pageId), anyLong(), eq(buffer), eq(true)); + + deltaIo.markMergedToFilePageStore(); + + filePageStore.read(pageId, buffer.rewind(), true); + + verify(deltaIo, times(2)).readWithMergedToFilePageStoreCheck(eq(pageId), anyLong(), eq(buffer), eq(true)); + verify(storeIo, times(1)).read(eq(pageId), anyLong(), eq(buffer), eq(true)); + } + } + private static FilePageStore createFilePageStore(Path filePath) { return createFilePageStore(filePath, new FilePageStoreHeader(VERSION_1, PAGE_SIZE)); } From e1c04620e6de39d782b465df6b837f5d2a590f59 Mon Sep 17 00:00:00 2001 From: Kirill Tkalenko Date: Wed, 3 Aug 2022 12:43:49 +0300 Subject: [PATCH 04/10] IGNITE-16657 wip --- .../persistence/compaction/Compactor.java | 8 +- .../persistence/compaction/CompactorTest.java | 154 ++++++++++++++++++ 2 files changed, 161 insertions(+), 1 deletion(-) diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java index 5a6e08b9db44..ebbc29012c54 100644 --- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java +++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java @@ -291,7 +291,9 @@ void mergeDeltaFileToMainFile(FilePageStore filePageStore, DeltaFilePageStoreIo return; } - deltaFilePageStore.readWithMergedToFilePageStoreCheck(pageOffset, pageOffset, buffer.rewind(), true); + boolean read = deltaFilePageStore.readWithMergedToFilePageStoreCheck(pageOffset, pageOffset, buffer.rewind(), true); + + assert read : deltaFilePageStore.filePath(); long pageId = PageIo.getPageId(buffer.rewind()); @@ -329,6 +331,10 @@ void mergeDeltaFileToMainFile(FilePageStore filePageStore, DeltaFilePageStoreIo deltaFilePageStore.markMergedToFilePageStore(); deltaFilePageStore.stop(true); + + deltaFileCount.decrementAndGet(); + + future.complete(null); } catch (Throwable e) { future.completeExceptionally(e); } diff --git a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/compaction/CompactorTest.java b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/compaction/CompactorTest.java index e5803fa8581a..f45a7d567152 100644 --- a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/compaction/CompactorTest.java +++ b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/compaction/CompactorTest.java @@ -17,8 +17,162 @@ package org.apache.ignite.internal.pagememory.persistence.compaction; +import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition; +import static org.apache.ignite.internal.util.GridUnsafe.bufferAddress; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import org.apache.ignite.configuration.ConfigurationValue; +import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.logger.Loggers; +import org.apache.ignite.internal.pagememory.io.PageIo; +import org.apache.ignite.internal.pagememory.persistence.store.DeltaFilePageStoreIo; +import org.apache.ignite.internal.pagememory.persistence.store.FilePageStore; +import org.apache.ignite.internal.pagememory.persistence.store.FilePageStoreManager; +import org.junit.jupiter.api.Test; + /** * For {@link Compactor} testing. */ public class CompactorTest { + private static final int PAGE_SIZE = 1024; + + private final IgniteLogger log = Loggers.forClass(CompactorTest.class); + + @Test + void testStartAndStop() throws Exception { + Compactor compactor = new Compactor(log, "test", null, threadsConfig(1), mock(FilePageStoreManager.class), PAGE_SIZE); + + compactor.start(); + + assertNull(compactor.runner()); + + assertFalse(compactor.isCancelled()); + assertFalse(compactor.isDone()); + assertFalse(Thread.currentThread().isInterrupted()); + + compactor.start(); + + assertTrue(waitForCondition(() -> compactor.runner() != null, 10, 1_000)); + + compactor.stop(); + + assertTrue(waitForCondition(() -> compactor.runner() == null, 10, 1_000)); + + assertTrue(compactor.isCancelled()); + assertTrue(compactor.isDone()); + assertFalse(Thread.currentThread().isInterrupted()); + } + + @Test + void testMergeDeltaFileToMainFile() throws Exception { + Compactor compactor = new Compactor(log, "test", null, threadsConfig(1), mock(FilePageStoreManager.class), PAGE_SIZE); + + FilePageStore filePageStore = mock(FilePageStore.class); + DeltaFilePageStoreIo deltaFilePageStoreIo = mock(DeltaFilePageStoreIo.class); + CompletableFuture future = new CompletableFuture<>(); + + when(filePageStore.removeDeltaFile(eq(deltaFilePageStoreIo))).thenReturn(true); + + when(deltaFilePageStoreIo.pageOffsets()).thenReturn(new long[]{0}); + + when(deltaFilePageStoreIo.readWithMergedToFilePageStoreCheck(anyLong(), anyLong(), any(ByteBuffer.class), anyBoolean())) + .then(answer -> { + ByteBuffer buffer = answer.getArgument(2); + + PageIo.setPageId(bufferAddress(buffer), 1); + + return true; + }); + + compactor.mergeDeltaFileToMainFile(filePageStore, deltaFilePageStoreIo, future); + + future.get(1, TimeUnit.SECONDS); + + verify(deltaFilePageStoreIo, times(1)).readWithMergedToFilePageStoreCheck(eq(0L), eq(0L), any(ByteBuffer.class), anyBoolean()); + verify(filePageStore, times(1)).write(eq(1L), any(ByteBuffer.class), anyBoolean()); + + verify(filePageStore, times(1)).sync(); + verify(filePageStore, times(1)).removeDeltaFile(eq(deltaFilePageStoreIo)); + + verify(deltaFilePageStoreIo, times(1)).markMergedToFilePageStore(); + verify(deltaFilePageStoreIo, times(1)).stop(eq(true)); + } + + @Test + void testDoCompaction() { + FilePageStore filePageStore = mock(FilePageStore.class); + + DeltaFilePageStoreIo deltaFilePageStoreIo = mock(DeltaFilePageStoreIo.class); + + when(filePageStore.getDeltaFileToCompaction()).thenReturn(deltaFilePageStoreIo); + + FilePageStoreManager filePageStoreManager = mock(FilePageStoreManager.class); + + when(filePageStoreManager.allPageStores()).thenReturn(List.of(List.of(filePageStore))); + + Compactor compactor = spy(new Compactor(log, "test", null, threadsConfig(1), filePageStoreManager, PAGE_SIZE)); + + doAnswer(answer -> { + assertSame(filePageStore, answer.getArgument(0)); + assertSame(deltaFilePageStoreIo, answer.getArgument(1)); + + ((CompletableFuture) answer.getArgument(2)).complete(null); + + return null; + }) + .when(compactor) + .mergeDeltaFileToMainFile(any(FilePageStore.class), any(DeltaFilePageStoreIo.class), any(CompletableFuture.class)); + + compactor.doCompaction(); + + verify(filePageStore, times(1)).getDeltaFileToCompaction(); + + verify(compactor, times(1)) + .mergeDeltaFileToMainFile(any(FilePageStore.class), any(DeltaFilePageStoreIo.class), any(CompletableFuture.class)); + } + + @Test + void testBody() throws Exception { + Compactor compactor = spy(new Compactor(log, "test", null, threadsConfig(1), mock(FilePageStoreManager.class), PAGE_SIZE)); + + doNothing().when(compactor).waitDeltaFiles(); + + doAnswer(answer -> { + compactor.cancel(); + + return null; + }).when(compactor).doCompaction(); + + compactor.body(); + + verify(compactor, times(3)).isCancelled(); + verify(compactor, times(1)).waitDeltaFiles(); + verify(compactor, times(1)).doCompaction(); + } + + private static ConfigurationValue threadsConfig(int threads) { + ConfigurationValue configValue = mock(ConfigurationValue.class); + + when(configValue.value()).thenReturn(threads); + + return configValue; + } } From 9b954f075747b75a8475026dd3da97a6d322d87c Mon Sep 17 00:00:00 2001 From: Kirill Tkalenko Date: Wed, 3 Aug 2022 12:57:53 +0300 Subject: [PATCH 05/10] IGNITE-16657 wip --- .../persistence/compaction/Compactor.java | 15 ++++++++ .../persistence/compaction/CompactorTest.java | 35 +++++++++++++++++++ 2 files changed, 50 insertions(+) diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java index ebbc29012c54..5c58163a8846 100644 --- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java +++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java @@ -154,6 +154,21 @@ void waitDeltaFiles() { } } + /** + * Adds the number of delta files to compact. + * + * @param count Number of delta files. + */ + public void addDeltaFiles(int count) { + assert count > 0; + + deltaFileCount.addAndGet(count); + + synchronized (mux) { + mux.notifyAll(); + } + } + /** * Merges delta files with partition files. */ diff --git a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/compaction/CompactorTest.java b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/compaction/CompactorTest.java index f45a7d567152..7bc7a516ce64 100644 --- a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/compaction/CompactorTest.java +++ b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/compaction/CompactorTest.java @@ -17,11 +17,14 @@ package org.apache.ignite.internal.pagememory.persistence.compaction; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync; import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition; import static org.apache.ignite.internal.util.GridUnsafe.bufferAddress; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; @@ -39,6 +42,7 @@ import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.apache.ignite.configuration.ConfigurationValue; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; @@ -168,6 +172,37 @@ void testBody() throws Exception { verify(compactor, times(1)).doCompaction(); } + @Test + void testWaitDeltaFiles() throws Exception { + Compactor compactor = spy(new Compactor(log, "test", null, threadsConfig(1), mock(FilePageStoreManager.class), PAGE_SIZE)); + + CompletableFuture waitDeltaFilesFuture = runAsync(compactor::waitDeltaFiles); + + assertThrows(TimeoutException.class, () -> waitDeltaFilesFuture.get(100, MILLISECONDS)); + + compactor.addDeltaFiles(1); + + waitDeltaFilesFuture.get(100, MILLISECONDS); + } + + @Test + void testCancel() throws Exception { + Compactor compactor = spy(new Compactor(log, "test", null, threadsConfig(1), mock(FilePageStoreManager.class), PAGE_SIZE)); + + assertFalse(compactor.isCancelled()); + + CompletableFuture waitDeltaFilesFuture = runAsync(compactor::waitDeltaFiles); + + assertThrows(TimeoutException.class, () -> waitDeltaFilesFuture.get(100, MILLISECONDS)); + + compactor.cancel(); + + assertTrue(compactor.isCancelled()); + assertFalse(Thread.currentThread().isInterrupted()); + + waitDeltaFilesFuture.get(100, MILLISECONDS); + } + private static ConfigurationValue threadsConfig(int threads) { ConfigurationValue configValue = mock(ConfigurationValue.class); From 8bd2430a6d88c8ab777da5a884626be6ad916b28 Mon Sep 17 00:00:00 2001 From: Kirill Tkalenko Date: Wed, 3 Aug 2022 13:55:40 +0300 Subject: [PATCH 06/10] IGNITE-16657 wip --- ...geMemoryCheckpointConfigurationSchema.java | 7 +- .../checkpoint/CheckpointManager.java | 19 +++- .../persistence/checkpoint/Checkpointer.java | 11 ++- .../persistence/compaction/Compactor.java | 97 ++++++++++--------- .../checkpoint/CheckpointerTest.java | 13 ++- .../persistence/compaction/CompactorTest.java | 17 +--- 6 files changed, 102 insertions(+), 62 deletions(-) diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/configuration/schema/PageMemoryCheckpointConfigurationSchema.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/configuration/schema/PageMemoryCheckpointConfigurationSchema.java index b80a18a30185..2352b360a413 100644 --- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/configuration/schema/PageMemoryCheckpointConfigurationSchema.java +++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/configuration/schema/PageMemoryCheckpointConfigurationSchema.java @@ -39,7 +39,12 @@ public class PageMemoryCheckpointConfigurationSchema { /** Number of checkpoint threads. */ @Range(min = 1) @Value(hasDefault = true) - public int threads = 4; + public int checkpointThreads = 4; + + /** Number of threads to compact delta files. */ + @Range(min = 1) + @Value(hasDefault = true) + public int compactionThreads = 4; /** Timeout for checkpoint read lock acquisition in milliseconds. */ @Range(min = 0) diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java index d94782e492a9..dc59c650ad47 100644 --- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java +++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java @@ -32,6 +32,7 @@ import org.apache.ignite.internal.pagememory.persistence.PartitionMetaManager; import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory; import org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointDirtyPages.CheckpointDirtyPagesView; +import org.apache.ignite.internal.pagememory.persistence.compaction.Compactor; import org.apache.ignite.internal.pagememory.persistence.store.DeltaFilePageStoreIo; import org.apache.ignite.internal.pagememory.persistence.store.FilePageStore; import org.apache.ignite.internal.pagememory.persistence.store.FilePageStoreManager; @@ -76,6 +77,9 @@ public class CheckpointManager { /** File page store manager. */ private final FilePageStoreManager filePageStoreManager; + /** Delta file compactor. */ + private final Compactor compactor; + /** * Constructor. * @@ -133,6 +137,15 @@ public CheckpointManager( pageSize ); + compactor = new Compactor( + Loggers.forClass(Compactor.class), + igniteInstanceName, + workerListener, + checkpointConfig.compactionThreads(), + filePageStoreManager, + pageSize + ); + checkpointer = new Checkpointer( Loggers.forClass(Checkpoint.class), igniteInstanceName, @@ -141,6 +154,7 @@ public CheckpointManager( checkpointWorkflow, checkpointPagesWriterFactory, filePageStoreManager, + compactor, checkpointConfig ); @@ -162,6 +176,8 @@ public void start() { checkpointer.start(); checkpointTimeoutLock.start(); + + compactor.start(); } /** @@ -171,7 +187,8 @@ public void stop() throws Exception { IgniteUtils.closeAll( checkpointTimeoutLock::stop, checkpointer::stop, - checkpointWorkflow::stop + checkpointWorkflow::stop, + compactor::stop ); } diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java index f8569013a990..ed329cb1878a 100644 --- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java +++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java @@ -48,6 +48,7 @@ import org.apache.ignite.internal.pagememory.configuration.schema.PageMemoryCheckpointView; import org.apache.ignite.internal.pagememory.persistence.GroupPartitionId; import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory; +import org.apache.ignite.internal.pagememory.persistence.compaction.Compactor; import org.apache.ignite.internal.pagememory.persistence.store.DeltaFilePageStoreIo; import org.apache.ignite.internal.pagememory.persistence.store.FilePageStore; import org.apache.ignite.internal.pagememory.persistence.store.FilePageStoreManager; @@ -139,6 +140,9 @@ public class Checkpointer extends IgniteWorker { /** File page store manager. */ private final FilePageStoreManager filePageStoreManager; + /** Delta file compactor. */ + private final Compactor compactor; + /** * Constructor. * @@ -149,6 +153,7 @@ public class Checkpointer extends IgniteWorker { * @param checkpointWorkFlow Implementation of checkpoint. * @param factory Page writer factory. * @param filePageStoreManager File page store manager. + * @param compactor Delta file compactor. * @param checkpointConfig Checkpoint configuration. */ Checkpointer( @@ -159,6 +164,7 @@ public class Checkpointer extends IgniteWorker { CheckpointWorkflow checkpointWorkFlow, CheckpointPagesWriterFactory factory, FilePageStoreManager filePageStoreManager, + Compactor compactor, PageMemoryCheckpointConfiguration checkpointConfig ) { super(log, igniteInstanceName, "checkpoint-thread", workerListener); @@ -168,10 +174,11 @@ public class Checkpointer extends IgniteWorker { this.checkpointWorkflow = checkpointWorkFlow; this.checkpointPagesWriterFactory = factory; this.filePageStoreManager = filePageStoreManager; + this.compactor = compactor; scheduledCheckpointProgress = new CheckpointProgressImpl(MILLISECONDS.toNanos(nextCheckpointInterval())); - int checkpointWritePageThreads = checkpointConfig.threads().value(); + int checkpointWritePageThreads = checkpointConfig.checkpointThreads().value(); if (checkpointWritePageThreads > 1) { checkpointWritePagesPool = new ThreadPoolExecutor( @@ -431,6 +438,8 @@ boolean writePages( syncUpdatedPageStores(updatedPartitions); + compactor.addDeltaFiles(updatedPartitions.size()); + if (shutdownNow.getAsBoolean()) { currentCheckpointProgress.fail(new NodeStoppingException("Node is stopping.")); diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java index 5c58163a8846..c3649b21d99f 100644 --- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java +++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java @@ -160,12 +160,14 @@ void waitDeltaFiles() { * @param count Number of delta files. */ public void addDeltaFiles(int count) { - assert count > 0; + assert count >= 0; - deltaFileCount.addAndGet(count); + if (count > 0) { + deltaFileCount.addAndGet(count); - synchronized (mux) { - mux.notifyAll(); + synchronized (mux) { + mux.notifyAll(); + } } } @@ -198,9 +200,15 @@ void doCompaction() { Runnable merger = () -> { IgniteBiTuple toMerge; - while ((toMerge = queue.poll()) != null && !isCancelled()) { - mergeDeltaFileToMainFile(toMerge.get1(), toMerge.get2(), future); + try { + while ((toMerge = queue.poll()) != null) { + mergeDeltaFileToMainFile(toMerge.get1(), toMerge.get2()); + } + } catch (Throwable ex) { + future.completeExceptionally(ex); } + + future.complete(null); }; if (isCancelled()) { @@ -292,66 +300,63 @@ public void cancel() { * * @param filePageStore File page store. * @param deltaFilePageStore Delta file page store. - * @param future Future that should complete when the merge is complete. + * @throws Throwable If failed. */ - void mergeDeltaFileToMainFile(FilePageStore filePageStore, DeltaFilePageStoreIo deltaFilePageStore, CompletableFuture future) { - try { - // Copy pages deltaFilePageStore -> filePageStore. - ByteBuffer buffer = threadBuf.get(); - - for (long pageOffset : deltaFilePageStore.pageOffsets()) { - updateHeartbeat(); - - if (isCancelled()) { - return; - } + void mergeDeltaFileToMainFile( + FilePageStore filePageStore, + DeltaFilePageStoreIo deltaFilePageStore + ) throws Throwable { + // Copy pages deltaFilePageStore -> filePageStore. + ByteBuffer buffer = threadBuf.get(); + + for (long pageOffset : deltaFilePageStore.pageOffsets()) { + updateHeartbeat(); - boolean read = deltaFilePageStore.readWithMergedToFilePageStoreCheck(pageOffset, pageOffset, buffer.rewind(), true); + if (isCancelled()) { + return; + } - assert read : deltaFilePageStore.filePath(); + boolean read = deltaFilePageStore.readWithMergedToFilePageStoreCheck(pageOffset, pageOffset, buffer.rewind(), false); - long pageId = PageIo.getPageId(buffer.rewind()); + assert read : deltaFilePageStore.filePath(); - assert pageId != 0 : deltaFilePageStore.filePath(); + long pageId = PageIo.getPageId(buffer.rewind()); - updateHeartbeat(); + assert pageId != 0 : deltaFilePageStore.filePath(); - if (isCancelled()) { - return; - } - - filePageStore.write(pageId, buffer.rewind(), true); - } - - // Fsync the file page store. updateHeartbeat(); if (isCancelled()) { return; } - filePageStore.sync(); + filePageStore.write(pageId, buffer.rewind(), true); + } - // Removing the delta file page store from a file page store. - updateHeartbeat(); + // Fsync the file page store. + updateHeartbeat(); - if (isCancelled()) { - return; - } + if (isCancelled()) { + return; + } - boolean removed = filePageStore.removeDeltaFile(deltaFilePageStore); + filePageStore.sync(); - assert removed : filePageStore.filePath(); + // Removing the delta file page store from a file page store. + updateHeartbeat(); - deltaFilePageStore.markMergedToFilePageStore(); + if (isCancelled()) { + return; + } - deltaFilePageStore.stop(true); + boolean removed = filePageStore.removeDeltaFile(deltaFilePageStore); - deltaFileCount.decrementAndGet(); + assert removed : filePageStore.filePath(); - future.complete(null); - } catch (Throwable e) { - future.completeExceptionally(e); - } + deltaFilePageStore.markMergedToFilePageStore(); + + deltaFilePageStore.stop(true); + + deltaFileCount.decrementAndGet(); } } diff --git a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointerTest.java b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointerTest.java index d4ad1f6b4805..3e5f2b05f1a7 100644 --- a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointerTest.java +++ b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointerTest.java @@ -46,6 +46,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; @@ -71,6 +72,7 @@ import org.apache.ignite.internal.pagememory.persistence.PartitionMetaManager; import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory; import org.apache.ignite.internal.pagememory.persistence.WriteDirtyPage; +import org.apache.ignite.internal.pagememory.persistence.compaction.Compactor; import org.apache.ignite.internal.pagememory.persistence.store.DeltaFilePageStoreIo; import org.apache.ignite.internal.pagememory.persistence.store.FilePageStore; import org.apache.ignite.internal.pagememory.persistence.store.FilePageStoreManager; @@ -91,7 +93,7 @@ public class CheckpointerTest { private final IgniteLogger log = Loggers.forClass(CheckpointerTest.class); - @InjectConfiguration("mock : {threads=1, frequency=1000, frequencyDeviation=0}") + @InjectConfiguration("mock : {checkpointThreads=1, frequency=1000, frequencyDeviation=0}") private PageMemoryCheckpointConfiguration checkpointConfig; @BeforeAll @@ -116,6 +118,7 @@ void testStartAndStop() throws Exception { createCheckpointWorkflow(EMPTY), createCheckpointPagesWriterFactory(mock(PartitionMetaManager.class)), mock(FilePageStoreManager.class), + mock(Compactor.class), checkpointConfig ); @@ -148,6 +151,7 @@ void testScheduleCheckpoint() { mock(CheckpointWorkflow.class), mock(CheckpointPagesWriterFactory.class), mock(FilePageStoreManager.class), + mock(Compactor.class), checkpointConfig )); @@ -247,6 +251,7 @@ void testWaitCheckpointEvent() throws Exception { mock(CheckpointWorkflow.class), mock(CheckpointPagesWriterFactory.class), mock(FilePageStoreManager.class), + mock(Compactor.class), checkpointConfig ); @@ -275,6 +280,7 @@ void testCheckpointBody() throws Exception { createCheckpointWorkflow(EMPTY), createCheckpointPagesWriterFactory(mock(PartitionMetaManager.class)), mock(FilePageStoreManager.class), + mock(Compactor.class), checkpointConfig )); @@ -352,6 +358,8 @@ void testDoCheckpoint() throws Exception { when(filePageStore.getNewDeltaFile()).thenReturn(completedFuture(mock(DeltaFilePageStoreIo.class))); + Compactor compactor = mock(Compactor.class); + Checkpointer checkpointer = spy(new Checkpointer( log, "test", @@ -360,6 +368,7 @@ void testDoCheckpoint() throws Exception { createCheckpointWorkflow(dirtyPages), createCheckpointPagesWriterFactory(partitionMetaManager), createFilePageStoreManager(Map.of(new GroupPartitionId(0, 0), filePageStore)), + compactor, checkpointConfig )); @@ -367,6 +376,7 @@ void testDoCheckpoint() throws Exception { verify(dirtyPages, times(1)).toDirtyPageIdQueue(); verify(checkpointer, times(1)).startCheckpointProgress(); + verify(compactor, times(1)).addDeltaFiles(eq(1)); assertEquals(checkpointer.lastCheckpointProgress().currentCheckpointPagesCount(), 3); } @@ -381,6 +391,7 @@ void testNextCheckpointInterval() throws Exception { mock(CheckpointWorkflow.class), mock(CheckpointPagesWriterFactory.class), mock(FilePageStoreManager.class), + mock(Compactor.class), checkpointConfig ); diff --git a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/compaction/CompactorTest.java b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/compaction/CompactorTest.java index 7bc7a516ce64..39d8035af208 100644 --- a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/compaction/CompactorTest.java +++ b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/compaction/CompactorTest.java @@ -41,7 +41,6 @@ import java.nio.ByteBuffer; import java.util.List; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.apache.ignite.configuration.ConfigurationValue; import org.apache.ignite.internal.logger.IgniteLogger; @@ -86,12 +85,11 @@ void testStartAndStop() throws Exception { } @Test - void testMergeDeltaFileToMainFile() throws Exception { + void testMergeDeltaFileToMainFile() throws Throwable { Compactor compactor = new Compactor(log, "test", null, threadsConfig(1), mock(FilePageStoreManager.class), PAGE_SIZE); FilePageStore filePageStore = mock(FilePageStore.class); DeltaFilePageStoreIo deltaFilePageStoreIo = mock(DeltaFilePageStoreIo.class); - CompletableFuture future = new CompletableFuture<>(); when(filePageStore.removeDeltaFile(eq(deltaFilePageStoreIo))).thenReturn(true); @@ -106,9 +104,7 @@ void testMergeDeltaFileToMainFile() throws Exception { return true; }); - compactor.mergeDeltaFileToMainFile(filePageStore, deltaFilePageStoreIo, future); - - future.get(1, TimeUnit.SECONDS); + compactor.mergeDeltaFileToMainFile(filePageStore, deltaFilePageStoreIo); verify(deltaFilePageStoreIo, times(1)).readWithMergedToFilePageStoreCheck(eq(0L), eq(0L), any(ByteBuffer.class), anyBoolean()); verify(filePageStore, times(1)).write(eq(1L), any(ByteBuffer.class), anyBoolean()); @@ -121,7 +117,7 @@ void testMergeDeltaFileToMainFile() throws Exception { } @Test - void testDoCompaction() { + void testDoCompaction() throws Throwable { FilePageStore filePageStore = mock(FilePageStore.class); DeltaFilePageStoreIo deltaFilePageStoreIo = mock(DeltaFilePageStoreIo.class); @@ -138,19 +134,16 @@ void testDoCompaction() { assertSame(filePageStore, answer.getArgument(0)); assertSame(deltaFilePageStoreIo, answer.getArgument(1)); - ((CompletableFuture) answer.getArgument(2)).complete(null); - return null; }) .when(compactor) - .mergeDeltaFileToMainFile(any(FilePageStore.class), any(DeltaFilePageStoreIo.class), any(CompletableFuture.class)); + .mergeDeltaFileToMainFile(any(FilePageStore.class), any(DeltaFilePageStoreIo.class)); compactor.doCompaction(); verify(filePageStore, times(1)).getDeltaFileToCompaction(); - verify(compactor, times(1)) - .mergeDeltaFileToMainFile(any(FilePageStore.class), any(DeltaFilePageStoreIo.class), any(CompletableFuture.class)); + verify(compactor, times(1)).mergeDeltaFileToMainFile(any(FilePageStore.class), any(DeltaFilePageStoreIo.class)); } @Test From f48eebb44c0e62a63c351939cdd44922f44f0c69 Mon Sep 17 00:00:00 2001 From: Kirill Tkalenko Date: Wed, 3 Aug 2022 14:05:47 +0300 Subject: [PATCH 07/10] IGNITE-16657 wip --- .../persistence/checkpoint/CheckpointManager.java | 9 +++++++++ .../pagememory/PersistentPageMemoryTableStorage.java | 6 ++++++ 2 files changed, 15 insertions(+) diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java index dc59c650ad47..57e764058297 100644 --- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java +++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java @@ -302,4 +302,13 @@ static int[] pageIndexesForDeltaFilePageStore(CheckpointDirtyPagesView partition return pageIndexes; } + + /** + * Adds the number of delta files to compact. + * + * @param count Number of delta files. + */ + public void addDeltaFileCountForCompaction(int count) { + compactor.addDeltaFiles(count); + } } diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java index 5299301230cc..593ce5c59bd9 100644 --- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java +++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java @@ -72,6 +72,12 @@ public void start() throws StorageException { try { dataRegion.filePageStoreManager().initialize(tableView.name(), tableView.tableId(), tableView.partitions()); + + int deltaFileCount = dataRegion.filePageStoreManager().getStores(tableView.tableId()).stream() + .mapToInt(FilePageStore::deltaFileCount) + .sum(); + + dataRegion.checkpointManager().addDeltaFileCountForCompaction(deltaFileCount); } catch (IgniteInternalCheckedException e) { throw new StorageException("Error initializing file page stores for table: " + tableView.name(), e); } From 1c00eae434e2cd679ffe9367704fcecdaf5c5d93 Mon Sep 17 00:00:00 2001 From: Kirill Tkalenko Date: Wed, 3 Aug 2022 14:21:17 +0300 Subject: [PATCH 08/10] IGNITE-16657 wip --- .../checkpoint/CheckpointPagesWriter.java | 13 ++- .../CheckpointPagesWriterFactory.java | 6 +- .../store/FilePageStoreManager.java | 41 ++++++-- .../store/PageReadWriteManagerImpl.java | 97 ------------------- .../store/PageReadWriteManagerImplTest.java | 84 ---------------- 5 files changed, 44 insertions(+), 197 deletions(-) delete mode 100644 modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/PageReadWriteManagerImpl.java delete mode 100644 modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/PageReadWriteManagerImplTest.java diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java index 140cbb4b9f91..29adbde0f13e 100644 --- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java +++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java @@ -75,9 +75,8 @@ public class CheckpointPagesWriter implements Runnable { /** Future which should be finished when all pages would be written. */ private final CompletableFuture doneFut; - /** Some action which will be executed every time before page will be written. */ - // TODO: IGNITE-16657 переименовать в обновление хертбитов - private final Runnable beforePageWrite; + /** Update heartbeat callback. */ + private final Runnable updateHeartbeat; /** Thread local with buffers for the checkpoint threads. Each buffer represent one page for durable memory. */ private final ThreadLocal threadBuf; @@ -104,7 +103,7 @@ public class CheckpointPagesWriter implements Runnable { * @param writePageIds Queue of dirty page IDs to write. * @param updatedPartitions Updated partitions. * @param doneFut Done future. - * @param beforePageWrite Action to be performed before every page write. + * @param updateHeartbeat Update heartbeat callback. * @param log Logger. * @param threadBuf Thread local byte buffer. * @param checkpointProgress Checkpoint progress. @@ -119,7 +118,7 @@ public class CheckpointPagesWriter implements Runnable { IgniteConcurrentMultiPairQueue writePageIds, ConcurrentMap updatedPartitions, CompletableFuture doneFut, - Runnable beforePageWrite, + Runnable updateHeartbeat, ThreadLocal threadBuf, CheckpointProgressImpl checkpointProgress, WriteDirtyPage pageWriter, @@ -132,7 +131,7 @@ public class CheckpointPagesWriter implements Runnable { this.writePageIds = writePageIds; this.updatedPartitions = updatedPartitions; this.doneFut = doneFut; - this.beforePageWrite = beforePageWrite; + this.updateHeartbeat = updateHeartbeat; this.threadBuf = threadBuf; this.checkpointProgress = checkpointProgress; this.pageWriter = pageWriter; @@ -184,7 +183,7 @@ private IgniteConcurrentMultiPairQueue writePa AtomicBoolean writeMetaPage = new AtomicBoolean(); while (!shutdownNow.getAsBoolean() && writePageIds.next(queueResult)) { - beforePageWrite.run(); + updateHeartbeat.run(); FullPageId fullId = queueResult.getValue(); diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriterFactory.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriterFactory.java index 831cbc0e49d7..ee8d15b20675 100644 --- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriterFactory.java +++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriterFactory.java @@ -91,7 +91,7 @@ public class CheckpointPagesWriterFactory { * @param dirtyPageIdQueue Checkpoint dirty page ID queue to write. * @param updatedPartitions Updated partitions. * @param doneWriteFut Write done future. - * @param beforePageWrite Before page write callback. + * @param updateHeartbeat Update heartbeat callback. * @param checkpointProgress Current checkpoint data. * @param shutdownNow Checker of stop operation. */ @@ -100,7 +100,7 @@ CheckpointPagesWriter build( IgniteConcurrentMultiPairQueue dirtyPageIdQueue, ConcurrentMap updatedPartitions, CompletableFuture doneWriteFut, - Runnable beforePageWrite, + Runnable updateHeartbeat, CheckpointProgressImpl checkpointProgress, // TODO: IGNITE-16993 Consider a lock replacement BooleanSupplier shutdownNow @@ -111,7 +111,7 @@ CheckpointPagesWriter build( dirtyPageIdQueue, updatedPartitions, doneWriteFut, - beforePageWrite, + updateHeartbeat, threadBuf, checkpointProgress, dirtyPageWriter, diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreManager.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreManager.java index d2a28abac6d8..70a15608c153 100644 --- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreManager.java +++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreManager.java @@ -21,6 +21,8 @@ import static java.util.Collections.unmodifiableList; import static java.util.stream.Collectors.toList; import static org.apache.ignite.internal.pagememory.PageIdAllocator.MAX_PARTITION_ID; +import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId; +import static org.apache.ignite.internal.pagememory.util.PageIdUtils.partitionId; import static org.apache.ignite.internal.util.GridUnsafe.allocateBuffer; import static org.apache.ignite.internal.util.GridUnsafe.freeBuffer; import static org.apache.ignite.internal.util.IgniteUtils.closeAll; @@ -82,9 +84,6 @@ public class FilePageStoreManager implements PageReadWriteManager { /** Page size in bytes. */ private final int pageSize; - /** Page read write manager. */ - private final PageReadWriteManager pageReadWriteManager = new PageReadWriteManagerImpl(this); - /** * Executor to disallow running code that modifies data in {@link #groupPageStores} concurrently with cleanup of file page store. */ @@ -180,7 +179,15 @@ public void stop() throws Exception { /** {@inheritDoc} */ @Override public void read(int grpId, long pageId, ByteBuffer pageBuf, boolean keepCrc) throws IgniteInternalCheckedException { - pageReadWriteManager.read(grpId, pageId, pageBuf, keepCrc); + FilePageStore pageStore = getStore(grpId, partitionId(pageId)); + + try { + pageStore.read(pageId, pageBuf, keepCrc); + } catch (IgniteInternalCheckedException e) { + // TODO: IGNITE-16899 By analogy with 2.0, fail a node + + throw e; + } } /** {@inheritDoc} */ @@ -191,13 +198,35 @@ public PageStore write( ByteBuffer pageBuf, boolean calculateCrc ) throws IgniteInternalCheckedException { - return pageReadWriteManager.write(grpId, pageId, pageBuf, calculateCrc); + FilePageStore pageStore = getStore(grpId, partitionId(pageId)); + + try { + pageStore.write(pageId, pageBuf, calculateCrc); + } catch (IgniteInternalCheckedException e) { + // TODO: IGNITE-16899 By analogy with 2.0, fail a node + + throw e; + } + + return pageStore; } /** {@inheritDoc} */ @Override public long allocatePage(int grpId, int partId, byte flags) throws IgniteInternalCheckedException { - return pageReadWriteManager.allocatePage(grpId, partId, flags); + assert partId >= 0 && partId <= MAX_PARTITION_ID : partId; + + FilePageStore pageStore = getStore(grpId, partId); + + try { + int pageIdx = pageStore.allocatePage(); + + return pageId(partId, flags, pageIdx); + } catch (IgniteInternalCheckedException e) { + // TODO: IGNITE-16899 By analogy with 2.0, fail a node + + throw e; + } } /** diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/PageReadWriteManagerImpl.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/PageReadWriteManagerImpl.java deleted file mode 100644 index d66fa993ba4e..000000000000 --- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/PageReadWriteManagerImpl.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.pagememory.persistence.store; - -import static org.apache.ignite.internal.pagememory.PageIdAllocator.MAX_PARTITION_ID; -import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId; -import static org.apache.ignite.internal.pagememory.util.PageIdUtils.partitionId; - -import java.nio.ByteBuffer; -import org.apache.ignite.internal.tostring.IgniteToStringExclude; -import org.apache.ignite.lang.IgniteInternalCheckedException; - -/** - * {@link org.apache.ignite.internal.pagememory.persistence.PageReadWriteManager} implementation. - */ -// TODO: IGNITE-16657 рассмотреть его удаление -class PageReadWriteManagerImpl implements org.apache.ignite.internal.pagememory.persistence.PageReadWriteManager { - @IgniteToStringExclude - protected final FilePageStoreManager filePageStoreManager; - - /** - * Constructor. - * - * @param filePageStoreManager File page store manager. - */ - public PageReadWriteManagerImpl(FilePageStoreManager filePageStoreManager) { - this.filePageStoreManager = filePageStoreManager; - } - - /** {@inheritDoc} */ - @Override - public void read(int grpId, long pageId, ByteBuffer pageBuf, boolean keepCrc) throws IgniteInternalCheckedException { - FilePageStore pageStore = filePageStoreManager.getStore(grpId, partitionId(pageId)); - - try { - pageStore.read(pageId, pageBuf, keepCrc); - } catch (IgniteInternalCheckedException e) { - // TODO: IGNITE-16899 By analogy with 2.0, fail a node - - throw e; - } - } - - /** {@inheritDoc} */ - @Override - public PageStore write( - int grpId, - long pageId, - ByteBuffer pageBuf, - boolean calculateCrc - ) throws IgniteInternalCheckedException { - FilePageStore pageStore = filePageStoreManager.getStore(grpId, partitionId(pageId)); - - try { - pageStore.write(pageId, pageBuf, calculateCrc); - } catch (IgniteInternalCheckedException e) { - // TODO: IGNITE-16899 By analogy with 2.0, fail a node - - throw e; - } - - return pageStore; - } - - /** {@inheritDoc} */ - @Override - public long allocatePage(int grpId, int partId, byte flags) throws IgniteInternalCheckedException { - assert partId >= 0 && partId <= MAX_PARTITION_ID : partId; - - FilePageStore pageStore = filePageStoreManager.getStore(grpId, partId); - - try { - int pageIdx = pageStore.allocatePage(); - - return pageId(partId, flags, pageIdx); - } catch (IgniteInternalCheckedException e) { - // TODO: IGNITE-16899 By analogy with 2.0, fail a node - - throw e; - } - } -} diff --git a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/PageReadWriteManagerImplTest.java b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/PageReadWriteManagerImplTest.java deleted file mode 100644 index 723669d97d59..000000000000 --- a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/PageReadWriteManagerImplTest.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.pagememory.persistence.store; - -import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertSame; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import java.nio.ByteBuffer; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -/** - * For {@link PageReadWriteManagerImpl} testing. - */ -public class PageReadWriteManagerImplTest { - private FilePageStore filePageStore; - - private FilePageStoreManager filePageStoreManager; - - private PageReadWriteManagerImpl pageReadWriteManager; - - @BeforeEach - void setUp() throws Exception { - filePageStore = mock(FilePageStore.class); - - filePageStoreManager = mock(FilePageStoreManager.class); - - when(filePageStoreManager.getStore(0, 0)).thenReturn(filePageStore); - - pageReadWriteManager = new PageReadWriteManagerImpl(filePageStoreManager); - } - - @Test - void testRead() throws Exception { - long pageId = pageId(0, (byte) 0, 0); - - ByteBuffer pageBuffer = mock(ByteBuffer.class); - - pageReadWriteManager.read(0, pageId, pageBuffer, true); - - verify(filePageStore, times(1)).read(pageId, pageBuffer, true); - } - - @Test - void testWrite() throws Exception { - long pageId = pageId(0, (byte) 0, 0); - - ByteBuffer pageBuffer = mock(ByteBuffer.class); - - assertSame(filePageStore, pageReadWriteManager.write(0, pageId, pageBuffer, true)); - - verify(filePageStore, times(1)).write(pageId, pageBuffer, true); - } - - @Test - void testAllocatePage() throws Exception { - assertEquals( - pageId(0, (byte) 1, 0), - pageReadWriteManager.allocatePage(0, 0, (byte) 1) - ); - - verify(filePageStore, times(1)).allocatePage(); - } -} From 165e1f87fd8aba5aa5a8dbc91750b0848c50a4aa Mon Sep 17 00:00:00 2001 From: Kirill Tkalenko Date: Wed, 3 Aug 2022 15:31:16 +0300 Subject: [PATCH 09/10] IGNITE-16657 wip --- .../persistence/compaction/Compactor.java | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java index c3649b21d99f..f2f726936630 100644 --- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java +++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java @@ -48,9 +48,19 @@ import org.jetbrains.annotations.Nullable; /** - * Empty. + * Entity to compact delta files. + * + *

To start compacting delta files, you need to notify about the appearance of {@link #addDeltaFiles(int) delta files ready for + * compaction}. Then all delta files {@link FilePageStore#getDeltaFileToCompaction() ready for compaction} will be collected and merged with + * their {@link FilePageStore file page stores} until all delta files are compacted. + * + *

Delta file compaction process consists of: + *

    + *
  • Copying pages from a delta file to a partition file.
  • + *
  • Fsync of the partition file.
  • + *
  • Remove delta file from {@link FilePageStore} and file system.
  • + *
*/ -// TODO: IGNITE-16657 добавить описание public class Compactor extends IgniteWorker { private final Object mux = new Object(); From 183991ee633a0407da14a3e84232db3752322553 Mon Sep 17 00:00:00 2001 From: Kirill Tkalenko Date: Thu, 4 Aug 2022 08:19:31 +0300 Subject: [PATCH 10/10] IGNITE-16657 after review #1.0 --- .../persistence/checkpoint/Checkpointer.java | 8 +---- .../persistence/compaction/Compactor.java | 16 ++++------ .../store/DeltaFilePageStoreIo.java | 32 +++++++++++-------- .../persistence/compaction/CompactorTest.java | 2 +- .../store/DeltaFilePageStoreIoTest.java | 29 ++++++++++------- 5 files changed, 44 insertions(+), 43 deletions(-) diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java index ed329cb1878a..1725816ddb1c 100644 --- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java +++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java @@ -36,7 +36,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.LongAdder; @@ -412,12 +411,7 @@ boolean writePages( if (pageWritePool == null) { write.run(); } else { - try { - pageWritePool.execute(write); - } catch (RejectedExecutionException ignore) { - // Run the task synchronously. - write.run(); - } + pageWritePool.execute(write); } } diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java index f2f726936630..bf9f285ee8f7 100644 --- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java +++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java @@ -28,7 +28,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -228,12 +227,7 @@ void doCompaction() { if (threadPoolExecutor == null) { merger.run(); } else { - try { - threadPoolExecutor.execute(merger); - } catch (RejectedExecutionException ignore) { - // Run the task synchronously. - merger.run(); - } + threadPoolExecutor.execute(merger); } } @@ -319,14 +313,18 @@ void mergeDeltaFileToMainFile( // Copy pages deltaFilePageStore -> filePageStore. ByteBuffer buffer = threadBuf.get(); - for (long pageOffset : deltaFilePageStore.pageOffsets()) { + for (long pageIndex : deltaFilePageStore.pageIndexes()) { updateHeartbeat(); if (isCancelled()) { return; } - boolean read = deltaFilePageStore.readWithMergedToFilePageStoreCheck(pageOffset, pageOffset, buffer.rewind(), false); + long pageOffset = deltaFilePageStore.pageOffset(pageIndex); + + // pageIndex instead of pageId, only for debugging in case of errors + // since we do not know the pageId until we read it from the pageOffset. + boolean read = deltaFilePageStore.readWithMergedToFilePageStoreCheck(pageIndex, pageOffset, buffer.rewind(), false); assert read : deltaFilePageStore.filePath(); diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/DeltaFilePageStoreIo.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/DeltaFilePageStoreIo.java index eb134255a4d7..7d5256a16944 100644 --- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/DeltaFilePageStoreIo.java +++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/DeltaFilePageStoreIo.java @@ -29,7 +29,6 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.file.Path; -import java.util.stream.IntStream; import org.apache.ignite.internal.fileio.FileIo; import org.apache.ignite.internal.fileio.FileIoFactory; import org.apache.ignite.internal.util.IgniteSpinBusyLock; @@ -101,17 +100,22 @@ public void checkHeader(FileIo fileIo) throws IOException { */ @Override public long pageOffset(long pageId) { - int searchResult = binarySearch(header.pageIndexes(), pageIndex(pageId)); + return pageOffset(pageIndex(pageId)); + } + + /** + * Returns page offset within the store file, {@code -1} if page not found in delta file. + * + * @param pageIdx Page index. + */ + public long pageOffset(int pageIdx) { + int searchResult = binarySearch(header.pageIndexes(), pageIdx); if (searchResult < 0) { return -1; } - return pageOffset(searchResult); - } - - private long pageOffset(int pagePosition) { - return (long) pagePosition * pageSize() + headerSize(); + return (long) searchResult * pageSize() + headerSize(); } /** @@ -150,13 +154,6 @@ public int fileIndex() { return header.index(); } - /** - * Returns page offsets within the store file. - */ - public long[] pageOffsets() { - return IntStream.of(header.pageIndexes()).mapToLong(this::pageOffset).toArray(); - } - /** * Marks that the delta file page store has been merged with the file page store. * @@ -166,4 +163,11 @@ public long[] pageOffsets() { public void markMergedToFilePageStore() { mergedBusyLock.block(); } + + /** + * Returns page indexes of the delta file page store. + */ + public int[] pageIndexes() { + return header.pageIndexes(); + } } diff --git a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/compaction/CompactorTest.java b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/compaction/CompactorTest.java index 39d8035af208..d065efcda943 100644 --- a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/compaction/CompactorTest.java +++ b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/compaction/CompactorTest.java @@ -93,7 +93,7 @@ void testMergeDeltaFileToMainFile() throws Throwable { when(filePageStore.removeDeltaFile(eq(deltaFilePageStoreIo))).thenReturn(true); - when(deltaFilePageStoreIo.pageOffsets()).thenReturn(new long[]{0}); + when(deltaFilePageStoreIo.pageIndexes()).thenReturn(new int[]{0}); when(deltaFilePageStoreIo.readWithMergedToFilePageStoreCheck(anyLong(), anyLong(), any(ByteBuffer.class), anyBoolean())) .then(answer -> { diff --git a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/DeltaFilePageStoreIoTest.java b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/DeltaFilePageStoreIoTest.java index 7760f8b12241..cebf1bd4a957 100644 --- a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/DeltaFilePageStoreIoTest.java +++ b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/DeltaFilePageStoreIoTest.java @@ -57,11 +57,19 @@ void testPageOffset() throws Exception { try (DeltaFilePageStoreIo filePageStoreIo = createFilePageStoreIo(testFilePath, header)) { assertEquals(PAGE_SIZE, filePageStoreIo.pageOffset(pageId(0, FLAG_DATA, 0))); + assertEquals(PAGE_SIZE, filePageStoreIo.pageOffset(0)); + assertEquals(2 * PAGE_SIZE, filePageStoreIo.pageOffset(pageId(0, FLAG_DATA, 1))); + assertEquals(2 * PAGE_SIZE, filePageStoreIo.pageOffset(1)); + assertEquals(3 * PAGE_SIZE, filePageStoreIo.pageOffset(pageId(0, FLAG_DATA, 2))); + assertEquals(3 * PAGE_SIZE, filePageStoreIo.pageOffset(2)); assertEquals(-1, filePageStoreIo.pageOffset(pageId(0, FLAG_DATA, 3))); + assertEquals(-1, filePageStoreIo.pageOffset(3)); + assertEquals(-1, filePageStoreIo.pageOffset(pageId(0, FLAG_DATA, 4))); + assertEquals(-1, filePageStoreIo.pageOffset(4)); } } @@ -141,18 +149,6 @@ void testCheckHeader() throws Exception { } } - @Test - void testPageOffsets() throws Exception { - DeltaFilePageStoreIoHeader header = new DeltaFilePageStoreIoHeader(DELTA_FILE_VERSION_1, 1, PAGE_SIZE, arr(0, 1, 2)); - - try (DeltaFilePageStoreIo filePageStoreIo = createFilePageStoreIo(workDir.resolve("test"), header)) { - assertArrayEquals( - IntStream.of(header.pageIndexes()).mapToLong(index -> ((long) index * PAGE_SIZE) + header.headerSize()).toArray(), - filePageStoreIo.pageOffsets() - ); - } - } - @Test void testMergedToFilePageStore() throws Exception { DeltaFilePageStoreIoHeader header = new DeltaFilePageStoreIoHeader(DELTA_FILE_VERSION_1, 1, PAGE_SIZE, arr(0, 1, 2)); @@ -189,6 +185,15 @@ void testMergedToFilePageStore() throws Exception { } } + @Test + void testPageIndexes() throws Exception { + DeltaFilePageStoreIoHeader header = new DeltaFilePageStoreIoHeader(DELTA_FILE_VERSION_1, 1, PAGE_SIZE, arr(0, 1, 2)); + + try (DeltaFilePageStoreIo filePageStoreIo = createFilePageStoreIo(workDir.resolve("test"), header)) { + assertArrayEquals(arr(0, 1, 2), filePageStoreIo.pageIndexes()); + } + } + /** {@inheritDoc} */ @Override DeltaFilePageStoreIo createFilePageStoreIo(Path filePath) {